diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/README.md | 1 | ||||
-rw-r--r-- | streaming/common.h | 9 | ||||
-rw-r--r-- | streaming/compression.c | 780 | ||||
-rw-r--r-- | streaming/compression.h | 175 | ||||
-rw-r--r-- | streaming/compression_brotli.c | 142 | ||||
-rw-r--r-- | streaming/compression_brotli.h | 15 | ||||
-rw-r--r-- | streaming/compression_gzip.c | 164 | ||||
-rw-r--r-- | streaming/compression_gzip.h | 15 | ||||
-rw-r--r-- | streaming/compression_lz4.c | 143 | ||||
-rw-r--r-- | streaming/compression_lz4.h | 19 | ||||
-rw-r--r-- | streaming/compression_zstd.c | 163 | ||||
-rw-r--r-- | streaming/compression_zstd.h | 19 | ||||
-rw-r--r-- | streaming/receiver.c | 420 | ||||
-rw-r--r-- | streaming/replication.c | 115 | ||||
-rw-r--r-- | streaming/rrdpush.c | 419 | ||||
-rw-r--r-- | streaming/rrdpush.h | 224 | ||||
-rw-r--r-- | streaming/sender.c | 581 | ||||
-rw-r--r-- | streaming/stream.conf | 13 |
18 files changed, 2625 insertions, 792 deletions
diff --git a/streaming/README.md b/streaming/README.md index a27167bc1..03de090e0 100644 --- a/streaming/README.md +++ b/streaming/README.md @@ -44,6 +44,7 @@ node**. This file is automatically generated by Netdata the first time it is sta | `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. | | `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. | | `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. | +| `parent using h2o` | `no` | Set to yes if you are connecting to parent trough it's h2o webserver/port. Currently there is no reason to set this to `yes` unless you are testing the new h2o based netdata webserver. When production ready this will be set to `yes` as default. | ### `[API_KEY]` and `[MACHINE_GUID]` sections diff --git a/streaming/common.h b/streaming/common.h new file mode 100644 index 000000000..b7292f4d0 --- /dev/null +++ b/streaming/common.h @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef STREAMING_COMMON_H +#define STREAMING_COMMON_H + +#define NETDATA_STREAM_URL "/stream" +#define NETDATA_STREAM_PROTO_NAME "netdata_stream/2.0" + +#endif /* STREAMING_COMMON_H */ diff --git a/streaming/compression.c b/streaming/compression.c index 6d4a128b0..a94c8a0a6 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -1,181 +1,707 @@ -#include "rrdpush.h" +// SPDX-License-Identifier: GPL-3.0-or-later -#ifdef ENABLE_RRDPUSH_COMPRESSION -#include "lz4.h" +#include "compression.h" -#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" +#include "compression_gzip.h" -/* - * Reset compressor state for a new stream - */ -void rrdpush_compressor_reset(struct compressor_state *state) { - if(!state->initialized) { - state->initialized = true; +#ifdef ENABLE_LZ4 +#include "compression_lz4.h" +#endif + +#ifdef ENABLE_ZSTD +#include "compression_zstd.h" +#endif - state->stream.lz4_stream = LZ4_createStream(); - state->stream.input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2); - state->stream.input_ring_buffer = callocz(1, state->stream.input_ring_buffer_size); - state->compression_result_buffer_size = 0; +#ifdef ENABLE_BROTLI +#include "compression_brotli.h" +#endif + +int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX] = { + [COMPRESSION_ALGORITHM_NONE] = 0, + [COMPRESSION_ALGORITHM_ZSTD] = 3, // 1 (faster) - 22 (smaller) + [COMPRESSION_ALGORITHM_LZ4] = 1, // 1 (smaller) - 9 (faster) + [COMPRESSION_ALGORITHM_BROTLI] = 3, // 0 (faster) - 11 (smaller) + [COMPRESSION_ALGORITHM_GZIP] = 1, // 1 (faster) - 9 (smaller) +}; + +void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order) { + // empty all slots + for(size_t i = 0; i < COMPRESSION_ALGORITHM_MAX ;i++) + rpt->config.compression_priorities[i] = STREAM_CAP_NONE; + + char *s = strdupz(order); + + char *words[COMPRESSION_ALGORITHM_MAX + 100] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd(s, words, COMPRESSION_ALGORITHM_MAX + 100); + size_t slot = 0; + STREAM_CAPABILITIES added = STREAM_CAP_NONE; + for(size_t i = 0; i < num_words && slot < COMPRESSION_ALGORITHM_MAX ;i++) { + if((STREAM_CAP_ZSTD_AVAILABLE) && strcasecmp(words[i], "zstd") == 0 && !(added & STREAM_CAP_ZSTD)) { + rpt->config.compression_priorities[slot++] = STREAM_CAP_ZSTD; + added |= STREAM_CAP_ZSTD; + } + else if((STREAM_CAP_LZ4_AVAILABLE) && strcasecmp(words[i], "lz4") == 0 && !(added & STREAM_CAP_LZ4)) { + rpt->config.compression_priorities[slot++] = STREAM_CAP_LZ4; + added |= STREAM_CAP_LZ4; + } + else if((STREAM_CAP_BROTLI_AVAILABLE) && strcasecmp(words[i], "brotli") == 0 && !(added & STREAM_CAP_BROTLI)) { + rpt->config.compression_priorities[slot++] = STREAM_CAP_BROTLI; + added |= STREAM_CAP_BROTLI; + } + else if(strcasecmp(words[i], "gzip") == 0 && !(added & STREAM_CAP_GZIP)) { + rpt->config.compression_priorities[slot++] = STREAM_CAP_GZIP; + added |= STREAM_CAP_GZIP; + } } - LZ4_resetStream_fast(state->stream.lz4_stream); + freez(s); + + // make sure all participate + if((STREAM_CAP_ZSTD_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_ZSTD)) + rpt->config.compression_priorities[slot++] = STREAM_CAP_ZSTD; + if((STREAM_CAP_LZ4_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_LZ4)) + rpt->config.compression_priorities[slot++] = STREAM_CAP_LZ4; + if((STREAM_CAP_BROTLI_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_BROTLI)) + rpt->config.compression_priorities[slot++] = STREAM_CAP_BROTLI; + if(slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_GZIP)) + rpt->config.compression_priorities[slot++] = STREAM_CAP_GZIP; +} + +void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt) { + if (!rpt->config.rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSIONS_AVAILABLE; + + // select the right compression before sending our capabilities to the child + if(stream_has_more_than_one_capability_of(rpt->capabilities, STREAM_CAP_COMPRESSIONS_AVAILABLE)) { + STREAM_CAPABILITIES compressions = rpt->capabilities & STREAM_CAP_COMPRESSIONS_AVAILABLE; + for(int i = 0; i < COMPRESSION_ALGORITHM_MAX; i++) { + STREAM_CAPABILITIES c = rpt->config.compression_priorities[i]; + + if(!(c & STREAM_CAP_COMPRESSIONS_AVAILABLE)) + continue; + + if(compressions & c) { + STREAM_CAPABILITIES exclude = compressions; + exclude &= ~c; - state->stream.input_ring_buffer_pos = 0; + rpt->capabilities &= ~exclude; + break; + } + } + } } -/* - * Destroy compressor state and all related data - */ -void rrdpush_compressor_destroy(struct compressor_state *state) { - if (state->stream.lz4_stream) { - LZ4_freeStream(state->stream.lz4_stream); - state->stream.lz4_stream = NULL; +bool rrdpush_compression_initialize(struct sender_state *s) { + rrdpush_compressor_destroy(&s->compressor); + + // IMPORTANT + // KEEP THE SAME ORDER IN DECOMPRESSION + + if(stream_has_capability(s, STREAM_CAP_ZSTD)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_ZSTD; + else if(stream_has_capability(s, STREAM_CAP_LZ4)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_LZ4; + else if(stream_has_capability(s, STREAM_CAP_BROTLI)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_BROTLI; + else if(stream_has_capability(s, STREAM_CAP_GZIP)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_GZIP; + else + s->compressor.algorithm = COMPRESSION_ALGORITHM_NONE; + + if(s->compressor.algorithm != COMPRESSION_ALGORITHM_NONE) { + s->compressor.level = rrdpush_compression_levels[s->compressor.algorithm]; + rrdpush_compressor_init(&s->compressor); + return true; } - freez(state->stream.input_ring_buffer); - state->stream.input_ring_buffer = NULL; + return false; +} - freez(state->compression_result_buffer); - state->compression_result_buffer = NULL; +bool rrdpush_decompression_initialize(struct receiver_state *rpt) { + rrdpush_decompressor_destroy(&rpt->decompressor); + + // IMPORTANT + // KEEP THE SAME ORDER IN COMPRESSION + + if(stream_has_capability(rpt, STREAM_CAP_ZSTD)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_ZSTD; + else if(stream_has_capability(rpt, STREAM_CAP_LZ4)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_LZ4; + else if(stream_has_capability(rpt, STREAM_CAP_BROTLI)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_BROTLI; + else if(stream_has_capability(rpt, STREAM_CAP_GZIP)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_GZIP; + else + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_NONE; + + if(rpt->decompressor.algorithm != COMPRESSION_ALGORITHM_NONE) { + rrdpush_decompressor_init(&rpt->decompressor); + return true; + } - state->initialized = false; + return false; } /* - * Compress the given block of data - * Compressed data will remain in the internal buffer until the next invocation - * Return the size of compressed data block as result and the pointer to internal buffer using the last argument - * or 0 in case of error - */ -size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out) { - if(unlikely(!state || !size || !out)) - return 0; - - if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) { - netdata_log_error("RRDPUSH COMPRESS: Compression Failed - Message size %lu above compression buffer limit: %d", - (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE); - return 0; +* In case of stream compression buffer overflow +* Inform the user through the error log file and +* deactivate compression by downgrading the stream protocol. +*/ +void rrdpush_compression_deactivate(struct sender_state *s) { + switch(s->compressor.algorithm) { + case COMPRESSION_ALGORITHM_MAX: + case COMPRESSION_ALGORITHM_NONE: + netdata_log_error("STREAM_COMPRESSION: compression error on 'host:%s' without any compression enabled. Ignoring error.", + rrdhost_hostname(s->host)); + break; + + case COMPRESSION_ALGORITHM_GZIP: + netdata_log_error("STREAM_COMPRESSION: GZIP compression error on 'host:%s'. Disabling GZIP for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_GZIP; + break; + + case COMPRESSION_ALGORITHM_LZ4: + netdata_log_error("STREAM_COMPRESSION: LZ4 compression error on 'host:%s'. Disabling ZSTD for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_LZ4; + break; + + case COMPRESSION_ALGORITHM_ZSTD: + netdata_log_error("STREAM_COMPRESSION: ZSTD compression error on 'host:%s'. Disabling ZSTD for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_ZSTD; + break; + + case COMPRESSION_ALGORITHM_BROTLI: + netdata_log_error("STREAM_COMPRESSION: BROTLI compression error on 'host:%s'. Disabling BROTLI for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_BROTLI; + break; } +} - size_t max_dst_size = LZ4_COMPRESSBOUND(size); - size_t data_size = max_dst_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; +// ---------------------------------------------------------------------------- +// compressor public API - if (!state->compression_result_buffer) { - state->compression_result_buffer = mallocz(data_size); - state->compression_result_buffer_size = data_size; +void rrdpush_compressor_init(struct compressor_state *state) { + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_compressor_init_zstd(state); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_compressor_init_lz4(state); + break; +#endif + +#ifdef ENABLE_BROTLI + case COMPRESSION_ALGORITHM_BROTLI: + rrdpush_compressor_init_brotli(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_compressor_init_gzip(state); + break; } - else if(unlikely(state->compression_result_buffer_size < data_size)) { - state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size); - state->compression_result_buffer_size = data_size; + + simple_ring_buffer_reset(&state->input); + simple_ring_buffer_reset(&state->output); +} + +void rrdpush_compressor_destroy(struct compressor_state *state) { + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_compressor_destroy_zstd(state); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_compressor_destroy_lz4(state); + break; +#endif + +#ifdef ENABLE_BROTLI + case COMPRESSION_ALGORITHM_BROTLI: + rrdpush_compressor_destroy_brotli(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_compressor_destroy_gzip(state); + break; } - // the ring buffer always has space for LZ4_MAX_MSG_SIZE - memcpy(state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, data, size); + state->initialized = false; - // this call needs the last 64K of our previous data - // they are available in the ring buffer - long int compressed_data_size = LZ4_compress_fast_continue( - state->stream.lz4_stream, - state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, - state->compression_result_buffer + RRDPUSH_COMPRESSION_SIGNATURE_SIZE, - (int)size, - (int)max_dst_size, - 1); + simple_ring_buffer_destroy(&state->input); + simple_ring_buffer_destroy(&state->output); +} + +size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out) { + size_t ret = 0; + + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + ret = rrdpush_compress_zstd(state, data, size, out); + break; +#endif - if (compressed_data_size < 0) { - netdata_log_error("Data compression error: %ld", compressed_data_size); +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + ret = rrdpush_compress_lz4(state, data, size, out); + break; +#endif + +#ifdef ENABLE_BROTLI + case COMPRESSION_ALGORITHM_BROTLI: + ret = rrdpush_compress_brotli(state, data, size, out); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + ret = rrdpush_compress_gzip(state, data, size, out); + break; + } + + if(unlikely(ret >= COMPRESSION_MAX_CHUNK)) { + netdata_log_error("RRDPUSH_COMPRESS: compressed data is %zu bytes, which is >= than the max chunk size %d", + ret, COMPRESSION_MAX_CHUNK); return 0; } - // update the next writing position of the ring buffer - state->stream.input_ring_buffer_pos += size; - if(unlikely(state->stream.input_ring_buffer_pos >= state->stream.input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE)) - state->stream.input_ring_buffer_pos = 0; + return ret; +} - // update the signature header - uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; - *(uint32_t *)state->compression_result_buffer = len | RRDPUSH_COMPRESSION_SIGNATURE; - *out = state->compression_result_buffer; - netdata_log_debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); - return compressed_data_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; +// ---------------------------------------------------------------------------- +// decompressor public API + +void rrdpush_decompressor_destroy(struct decompressor_state *state) { + if(unlikely(!state->initialized)) + return; + + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_decompressor_destroy_zstd(state); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_decompressor_destroy_lz4(state); + break; +#endif + +#ifdef ENABLE_BROTLI + case COMPRESSION_ALGORITHM_BROTLI: + rrdpush_decompressor_destroy_brotli(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_decompressor_destroy_gzip(state); + break; + } + + simple_ring_buffer_destroy(&state->output); + + state->initialized = false; } -/* - * Decompress the compressed data in the internal buffer - * Return the size of uncompressed data or 0 for error - */ -size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { - if (unlikely(!state || !compressed_data || !compressed_size)) - return 0; +void rrdpush_decompressor_init(struct decompressor_state *state) { + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_decompressor_init_zstd(state); + break; +#endif - if(unlikely(state->stream.read_at != state->stream.write_at)) - fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_decompressor_init_lz4(state); + break; +#endif + +#ifdef ENABLE_BROTLI + case COMPRESSION_ALGORITHM_BROTLI: + rrdpush_decompressor_init_brotli(state); + break; +#endif - if (unlikely(state->stream.write_at >= state->stream.size / 2)) { - state->stream.write_at = 0; - state->stream.read_at = 0; + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_decompressor_init_gzip(state); + break; } - long int decompressed_size = LZ4_decompress_safe_continue( - state->stream.lz4_stream - , compressed_data - , state->stream.buffer + state->stream.write_at - , (int)compressed_size - , (int)(state->stream.size - state->stream.write_at) - ); + state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE; + simple_ring_buffer_reset(&state->output); +} + +size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(state->output.read_pos != state->output.write_pos)) + fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); + + size_t ret = 0; + + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + ret = rrdpush_decompress_zstd(state, compressed_data, compressed_size); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + ret = rrdpush_decompress_lz4(state, compressed_data, compressed_size); + break; +#endif + +#ifdef ENABLE_BROTLI + case COMPRESSION_ALGORITHM_BROTLI: + ret = rrdpush_decompress_brotli(state, compressed_data, compressed_size); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + ret = rrdpush_decompress_gzip(state, compressed_data, compressed_size); + break; + } - if (unlikely(decompressed_size < 0)) { - netdata_log_error("RRDPUSH DECOMPRESS: decompressor returned negative decompressed bytes: %ld", decompressed_size); + // for backwards compatibility we cannot check for COMPRESSION_MAX_MSG_SIZE, + // because old children may send this big payloads. + if(unlikely(ret > COMPRESSION_MAX_CHUNK)) { + netdata_log_error("RRDPUSH_DECOMPRESS: decompressed data is %zu bytes, which is bigger than the max msg size %d", + ret, COMPRESSION_MAX_CHUNK); return 0; } - if(unlikely(decompressed_size + state->stream.write_at > state->stream.size)) - fatal("RRDPUSH DECOMPRESS: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, " - "exceeding the buffer by %zu" - , state->stream.size - , state->stream.write_at - , decompressed_size - , (size_t)(state->stream.write_at + decompressed_size - state->stream.size) - ); + return ret; +} + +// ---------------------------------------------------------------------------- +// unit test + +static inline long int my_random (void) { + return random(); +} + +void unittest_generate_random_name(char *dst, size_t size) { + if(size < 7) + size = 7; - state->stream.write_at += decompressed_size; + size_t len = 5 + my_random() % (size - 6); - // statistics - state->total_compressed += compressed_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; - state->total_uncompressed += decompressed_size; - state->packet_count++; + for(size_t i = 0; i < len ; i++) { + if(my_random() % 2 == 0) + dst[i] = 'A' + my_random() % 26; + else + dst[i] = 'a' + my_random() % 26; + } - return decompressed_size; + dst[len] = '\0'; } -void rrdpush_decompressor_reset(struct decompressor_state *state) { - if(!state->initialized) { - state->initialized = true; - state->stream.lz4_stream = LZ4_createStreamDecode(); - state->stream.size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2; - state->stream.buffer = mallocz(state->stream.size); +void unittest_generate_message(BUFFER *wb, time_t now_s, size_t counter) { + bool with_slots = true; + NUMBER_ENCODING integer_encoding = NUMBER_ENCODING_BASE64; + NUMBER_ENCODING doubles_encoding = NUMBER_ENCODING_BASE64; + time_t update_every = 1; + time_t point_end_time_s = now_s; + time_t wall_clock_time_s = now_s; + size_t chart_slot = counter + 1; + size_t dimensions = 2 + my_random() % 5; + char chart[RRD_ID_LENGTH_MAX + 1] = "name"; + unittest_generate_random_name(chart, 5 + my_random() % 30); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, chart_slot); } - LZ4_setStreamDecode(state->stream.lz4_stream, NULL, 0); + buffer_fast_strcat(wb, " '", 2); + buffer_strcat(wb, chart); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_uint64_encoded(wb, integer_encoding, update_every); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s); + buffer_fast_strcat(wb, " ", 1); + if(point_end_time_s == wall_clock_time_s) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time_s); + buffer_fast_strcat(wb, "\n", 1); + + + for(size_t d = 0; d < dimensions ;d++) { + size_t dim_slot = d + 1; + char dim_id[RRD_ID_LENGTH_MAX + 1] = "dimension"; + unittest_generate_random_name(dim_id, 10 + my_random() % 20); + int64_t last_collected_value = (my_random() % 2 == 0) ? (int64_t)(counter + d) : (int64_t)my_random(); + NETDATA_DOUBLE value = (my_random() % 2 == 0) ? (NETDATA_DOUBLE)my_random() / ((NETDATA_DOUBLE)my_random() + 1) : (NETDATA_DOUBLE)last_collected_value; + SN_FLAGS flags = (my_random() % 1000 == 0) ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_strcat(wb, dim_id); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_int64_encoded(wb, integer_encoding, last_collected_value); + buffer_fast_strcat(wb, " ", 1); + + if((NETDATA_DOUBLE)last_collected_value == value) + buffer_fast_strcat(wb, "#", 1); + else + buffer_print_netdata_double_encoded(wb, doubles_encoding, value); + + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, flags, true); + buffer_fast_strcat(wb, "\n", 1); + } - state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE; - state->stream.write_at = 0; - state->stream.read_at = 0; + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); } -void rrdpush_decompressor_destroy(struct decompressor_state *state) { - if(unlikely(!state->initialized)) - return; +int unittest_rrdpush_compression_speed(compression_algorithm_t algorithm, const char *name) { + fprintf(stderr, "\nTesting streaming compression speed with %s\n", name); + + struct compressor_state cctx = { + .initialized = false, + .algorithm = algorithm, + }; + struct decompressor_state dctx = { + .initialized = false, + .algorithm = algorithm, + }; + + rrdpush_compressor_init(&cctx); + rrdpush_decompressor_init(&dctx); + + int errors = 0; + + BUFFER *wb = buffer_create(COMPRESSION_MAX_MSG_SIZE, NULL); + time_t now_s = now_realtime_sec(); + usec_t compression_ut = 0; + usec_t decompression_ut = 0; + size_t bytes_compressed = 0; + size_t bytes_uncompressed = 0; + + usec_t compression_started_ut = now_monotonic_usec(); + usec_t decompression_started_ut = compression_started_ut; + + for(int i = 0; i < 10000 ;i++) { + compression_started_ut = now_monotonic_usec(); + decompression_ut += compression_started_ut - decompression_started_ut; + + buffer_flush(wb); + while(buffer_strlen(wb) < COMPRESSION_MAX_MSG_SIZE - 1024) + unittest_generate_message(wb, now_s, i); + + const char *txt = buffer_tostring(wb); + size_t txt_len = buffer_strlen(wb); + bytes_uncompressed += txt_len; + + const char *out; + size_t size = rrdpush_compress(&cctx, txt, txt_len, &out); + + bytes_compressed += size; + decompression_started_ut = now_monotonic_usec(); + compression_ut += decompression_started_ut - compression_started_ut; + + if(size == 0) { + fprintf(stderr, "iteration %d: compressed size %zu is zero\n", + i, size); + errors++; + goto cleanup; + } + else if(size >= COMPRESSION_MAX_CHUNK) { + fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n", + i, size); + errors++; + goto cleanup; + } + else { + size_t dtxt_len = rrdpush_decompress(&dctx, out, size); + char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos]; + + if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) { + fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n", + i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx) + ); + errors++; + goto cleanup; + } + + if(!dtxt_len) { + fprintf(stderr, "iteration %d: decompressed size is zero\n", i); + errors++; + goto cleanup; + } + else if(dtxt_len != txt_len) { + fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n", + i, dtxt_len, txt_len + ); + errors++; + goto cleanup; + } + else { + if(memcmp(txt, dtxt, txt_len) != 0) { + fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data length %zu\n", + i, dtxt, txt_len); + errors++; + goto cleanup; + } + } + } + + // here we are supposed to copy the data and advance the position + dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx); + } + +cleanup: + rrdpush_compressor_destroy(&cctx); + rrdpush_decompressor_destroy(&dctx); + + if(errors) + fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors); + else + fprintf(stderr, "Compression with %s: OK " + "(compression %zu usec, decompression %zu usec, bytes raw %zu, compressed %zu, savings ratio %0.2f%%)\n", + name, compression_ut, decompression_ut, + bytes_uncompressed, bytes_compressed, + 100.0 - (double)bytes_compressed * 100.0 / (double)bytes_uncompressed); - if (state->stream.lz4_stream) { - LZ4_freeStreamDecode(state->stream.lz4_stream); - state->stream.lz4_stream = NULL; + return errors; +} + +int unittest_rrdpush_compression(compression_algorithm_t algorithm, const char *name) { + fprintf(stderr, "\nTesting streaming compression with %s\n", name); + + struct compressor_state cctx = { + .initialized = false, + .algorithm = algorithm, + }; + struct decompressor_state dctx = { + .initialized = false, + .algorithm = algorithm, + }; + + char txt[COMPRESSION_MAX_MSG_SIZE]; + + rrdpush_compressor_init(&cctx); + rrdpush_decompressor_init(&dctx); + + int errors = 0; + + memset(txt, '=', COMPRESSION_MAX_MSG_SIZE); + + for(int i = 0; i < COMPRESSION_MAX_MSG_SIZE ;i++) { + txt[i] = 'A' + (i % 26); + size_t txt_len = i + 1; + + const char *out; + size_t size = rrdpush_compress(&cctx, txt, txt_len, &out); + + if(size == 0) { + fprintf(stderr, "iteration %d: compressed size %zu is zero\n", + i, size); + errors++; + goto cleanup; + } + else if(size >= COMPRESSION_MAX_CHUNK) { + fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n", + i, size); + errors++; + goto cleanup; + } + else { + size_t dtxt_len = rrdpush_decompress(&dctx, out, size); + char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos]; + + if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) { + fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n", + i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx) + ); + errors++; + goto cleanup; + } + + if(!dtxt_len) { + fprintf(stderr, "iteration %d: decompressed size is zero\n", i); + errors++; + goto cleanup; + } + else if(dtxt_len != txt_len) { + fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n", + i, dtxt_len, txt_len + ); + errors++; + goto cleanup; + } + else { + if(memcmp(txt, dtxt, txt_len) != 0) { + txt[txt_len] = '\0'; + dtxt[txt_len + 5] = '\0'; + + fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data '%s' of length %zu\n", + i, dtxt, txt, txt_len); + errors++; + goto cleanup; + } + } + } + + // fill the compressed buffer with garbage + memset((void *)out, 'x', size); + + // here we are supposed to copy the data and advance the position + dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx); } - freez(state->stream.buffer); - state->stream.buffer = NULL; +cleanup: + rrdpush_compressor_destroy(&cctx); + rrdpush_decompressor_destroy(&dctx); - state->initialized = false; + if(errors) + fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors); + else + fprintf(stderr, "Compression with %s: OK\n", name); + + return errors; } -#endif +int unittest_rrdpush_compressions(void) { + int ret = 0; + + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_ZSTD, "ZSTD"); + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_LZ4, "LZ4"); + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_BROTLI, "BROTLI"); + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_GZIP, "GZIP"); + + ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_ZSTD, "ZSTD"); + ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_LZ4, "LZ4"); + ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_BROTLI, "BROTLI"); + ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_GZIP, "GZIP"); + + return ret; +} diff --git a/streaming/compression.h b/streaming/compression.h new file mode 100644 index 000000000..a67f65b83 --- /dev/null +++ b/streaming/compression.h @@ -0,0 +1,175 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdpush.h" + +#ifndef NETDATA_RRDPUSH_COMPRESSION_H +#define NETDATA_RRDPUSH_COMPRESSION_H 1 + +// signature MUST end with a newline + +#if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD) +#error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)" +#endif + +typedef uint32_t rrdpush_signature_t; +#define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) +#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) +#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t) + +static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) { + rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; + return len | RRDPUSH_COMPRESSION_SIGNATURE; +} + +typedef enum { + COMPRESSION_ALGORITHM_NONE = 0, + COMPRESSION_ALGORITHM_ZSTD, + COMPRESSION_ALGORITHM_LZ4, + COMPRESSION_ALGORITHM_GZIP, + COMPRESSION_ALGORITHM_BROTLI, + + // terminator + COMPRESSION_ALGORITHM_MAX, +} compression_algorithm_t; + +extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX]; + +// this defines the order the algorithms will be selected by the receiver (parent) +#define RRDPUSH_COMPRESSION_ALGORITHMS_ORDER "zstd lz4 brotli gzip" + +// ---------------------------------------------------------------------------- + +typedef struct simple_ring_buffer { + const char *data; + size_t size; + size_t read_pos; + size_t write_pos; +} SIMPLE_RING_BUFFER; + +static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) { + b->read_pos = b->write_pos = 0; +} + +static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) { + if(b->write_pos + size > b->size) { + if(!b->size) + b->size = COMPRESSION_MAX_CHUNK; + else + b->size *= 2; + + if(b->write_pos + size > b->size) + b->size += size; + + b->data = (const char *)reallocz((void *)b->data, b->size); + } +} + +static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) { + simple_ring_buffer_make_room(b, size); + memcpy((void *)(b->data + b->write_pos), data, size); + b->write_pos += size; +} + +static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) { + freez((void *)b->data); + b->data = NULL; + b->read_pos = b->write_pos = b->size = 0; +} + +// ---------------------------------------------------------------------------- + +struct compressor_state { + bool initialized; + compression_algorithm_t algorithm; + + SIMPLE_RING_BUFFER input; + SIMPLE_RING_BUFFER output; + + int level; + void *stream; + + struct { + size_t total_compressed; + size_t total_uncompressed; + size_t total_compressions; + } sender_locked; +}; + +void rrdpush_compressor_init(struct compressor_state *state); +void rrdpush_compressor_destroy(struct compressor_state *state); +size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out); + +// ---------------------------------------------------------------------------- + +struct decompressor_state { + bool initialized; + compression_algorithm_t algorithm; + size_t signature_size; + + size_t total_compressed; + size_t total_uncompressed; + size_t total_compressions; + + SIMPLE_RING_BUFFER output; + + void *stream; +}; + +void rrdpush_decompressor_destroy(struct decompressor_state *state); +void rrdpush_decompressor_init(struct decompressor_state *state); +size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); + +static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) { + if (unlikely(!data || !data_size)) + return 0; + + if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE)) + return 0; + + rrdpush_signature_t sign = *(rrdpush_signature_t *)data; + if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE)) + return 0; + + size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); + return length; +} + +static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) { + if(unlikely(state->output.read_pos != state->output.write_pos)) + fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); + + return rrdpush_decompress_decode_signature(header, header_size); +} + +static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) { + if(unlikely(state->output.read_pos > state->output.write_pos)) + fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); + + return state->output.write_pos - state->output.read_pos; +} + +static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { + if (unlikely(!state || !size || !dst)) + return 0; + + size_t remaining = rrdpush_decompressed_bytes_in_buffer(state); + + if(unlikely(!remaining)) + return 0; + + size_t bytes_to_return = size; + if(bytes_to_return > remaining) + bytes_to_return = remaining; + + memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return); + state->output.read_pos += bytes_to_return; + + if(unlikely(state->output.read_pos > state->output.write_pos)) + fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); + + return bytes_to_return; +} + +// ---------------------------------------------------------------------------- + +#endif // NETDATA_RRDPUSH_COMPRESSION_H 1 diff --git a/streaming/compression_brotli.c b/streaming/compression_brotli.c new file mode 100644 index 000000000..cf52f3bca --- /dev/null +++ b/streaming/compression_brotli.c @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_brotli.h" + +#ifdef ENABLE_BROTLI +#include <brotli/encode.h> +#include <brotli/decode.h> + +void rrdpush_compressor_init_brotli(struct compressor_state *state) { + if (!state->initialized) { + state->initialized = true; + state->stream = BrotliEncoderCreateInstance(NULL, NULL, NULL); + + if (state->level < BROTLI_MIN_QUALITY) { + state->level = BROTLI_MIN_QUALITY; + } else if (state->level > BROTLI_MAX_QUALITY) { + state->level = BROTLI_MAX_QUALITY; + } + + BrotliEncoderSetParameter(state->stream, BROTLI_PARAM_QUALITY, state->level); + } +} + +void rrdpush_compressor_destroy_brotli(struct compressor_state *state) { + if (state->stream) { + BrotliEncoderDestroyInstance(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_compress_brotli(struct compressor_state *state, const char *data, size_t size, const char **out) { + if (unlikely(!state || !size || !out)) + return 0; + + simple_ring_buffer_make_room(&state->output, MAX(BrotliEncoderMaxCompressedSize(size), COMPRESSION_MAX_CHUNK)); + + size_t available_out = state->output.size; + + size_t available_in = size; + const uint8_t *next_in = (const uint8_t *)data; + uint8_t *next_out = (uint8_t *)state->output.data; + + if (!BrotliEncoderCompressStream(state->stream, BROTLI_OPERATION_FLUSH, &available_in, &next_in, &available_out, &next_out, NULL)) { + netdata_log_error("STREAM: Brotli compression failed."); + return 0; + } + + if(available_in != 0) { + netdata_log_error("STREAM: BrotliEncoderCompressStream() did not use all the input buffer, %zu bytes out of %zu remain", + available_in, size); + return 0; + } + + size_t compressed_size = state->output.size - available_out; + if(available_out == 0) { + netdata_log_error("STREAM: BrotliEncoderCompressStream() needs a bigger output buffer than the one we provided " + "(output buffer %zu bytes, compressed payload %zu bytes)", + state->output.size, size); + return 0; + } + + if(compressed_size == 0) { + netdata_log_error("STREAM: BrotliEncoderCompressStream() did not produce any output from the input provided " + "(input buffer %zu bytes)", + size); + return 0; + } + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size - available_in; + state->sender_locked.total_compressed += compressed_size; + + *out = state->output.data; + return compressed_size; +} + +void rrdpush_decompressor_init_brotli(struct decompressor_state *state) { + if (!state->initialized) { + state->initialized = true; + state->stream = BrotliDecoderCreateInstance(NULL, NULL, NULL); + + simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK); + } +} + +void rrdpush_decompressor_destroy_brotli(struct decompressor_state *state) { + if (state->stream) { + BrotliDecoderDestroyInstance(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_decompress_brotli(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + size_t available_out = state->output.size; + size_t available_in = compressed_size; + const uint8_t *next_in = (const uint8_t *)compressed_data; + uint8_t *next_out = (uint8_t *)state->output.data; + + if (BrotliDecoderDecompressStream(state->stream, &available_in, &next_in, &available_out, &next_out, NULL) == BROTLI_DECODER_RESULT_ERROR) { + netdata_log_error("STREAM: Brotli decompression failed."); + return 0; + } + + if(available_in != 0) { + netdata_log_error("STREAM: BrotliDecoderDecompressStream() did not use all the input buffer, %zu bytes out of %zu remain", + available_in, compressed_size); + return 0; + } + + size_t decompressed_size = state->output.size - available_out; + if(available_out == 0) { + netdata_log_error("STREAM: BrotliDecoderDecompressStream() needs a bigger output buffer than the one we provided " + "(output buffer %zu bytes, compressed payload %zu bytes)", + state->output.size, compressed_size); + return 0; + } + + if(decompressed_size == 0) { + netdata_log_error("STREAM: BrotliDecoderDecompressStream() did not produce any output from the input provided " + "(input buffer %zu bytes)", + compressed_size); + return 0; + } + + state->output.read_pos = 0; + state->output.write_pos = decompressed_size; + + state->total_compressed += compressed_size - available_in; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} + +#endif // ENABLE_BROTLI diff --git a/streaming/compression_brotli.h b/streaming/compression_brotli.h new file mode 100644 index 000000000..4955e5a82 --- /dev/null +++ b/streaming/compression_brotli.h @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_BROTLI_H +#define NETDATA_STREAMING_COMPRESSION_BROTLI_H + +void rrdpush_compressor_init_brotli(struct compressor_state *state); +void rrdpush_compressor_destroy_brotli(struct compressor_state *state); +size_t rrdpush_compress_brotli(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_brotli(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_brotli(struct decompressor_state *state); +void rrdpush_decompressor_destroy_brotli(struct decompressor_state *state); + +#endif //NETDATA_STREAMING_COMPRESSION_BROTLI_H diff --git a/streaming/compression_gzip.c b/streaming/compression_gzip.c new file mode 100644 index 000000000..c4ef3af05 --- /dev/null +++ b/streaming/compression_gzip.c @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_gzip.h" +#include <zlib.h> + +void rrdpush_compressor_init_gzip(struct compressor_state *state) { + if (!state->initialized) { + state->initialized = true; + + // Initialize deflate stream + z_stream *strm = state->stream = (z_stream *) mallocz(sizeof(z_stream)); + strm->zalloc = Z_NULL; + strm->zfree = Z_NULL; + strm->opaque = Z_NULL; + + if(state->level < Z_BEST_SPEED) + state->level = Z_BEST_SPEED; + + if(state->level > Z_BEST_COMPRESSION) + state->level = Z_BEST_COMPRESSION; + + // int r = deflateInit2(strm, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); + int r = deflateInit2(strm, state->level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); + if (r != Z_OK) { + netdata_log_error("Failed to initialize deflate with error: %d", r); + freez(state->stream); + state->initialized = false; + return; + } + + } +} + +void rrdpush_compressor_destroy_gzip(struct compressor_state *state) { + if (state->stream) { + deflateEnd(state->stream); + freez(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out) { + if (unlikely(!state || !size || !out)) + return 0; + + simple_ring_buffer_make_room(&state->output, deflateBound(state->stream, size)); + + z_stream *strm = state->stream; + strm->avail_in = (uInt)size; + strm->next_in = (Bytef *)data; + strm->avail_out = (uInt)state->output.size; + strm->next_out = (Bytef *)state->output.data; + + int ret = deflate(strm, Z_SYNC_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END) { + netdata_log_error("STREAM: deflate() failed with error %d", ret); + return 0; + } + + if(strm->avail_in != 0) { + netdata_log_error("STREAM: deflate() did not use all the input buffer, %u bytes out of %zu remain", + strm->avail_in, size); + return 0; + } + + if(strm->avail_out == 0) { + netdata_log_error("STREAM: deflate() needs a bigger output buffer than the one we provided " + "(output buffer %zu bytes, compressed payload %zu bytes)", + state->output.size, size); + return 0; + } + + size_t compressed_data_size = state->output.size - strm->avail_out; + + if(compressed_data_size == 0) { + netdata_log_error("STREAM: deflate() did not produce any output " + "(output buffer %zu bytes, compressed payload %zu bytes)", + state->output.size, size); + return 0; + } + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += compressed_data_size; + + *out = state->output.data; + return compressed_data_size; +} + +void rrdpush_decompressor_init_gzip(struct decompressor_state *state) { + if (!state->initialized) { + state->initialized = true; + + // Initialize inflate stream + z_stream *strm = state->stream = (z_stream *)mallocz(sizeof(z_stream)); + strm->zalloc = Z_NULL; + strm->zfree = Z_NULL; + strm->opaque = Z_NULL; + + int r = inflateInit2(strm, 15 + 16); + if (r != Z_OK) { + netdata_log_error("Failed to initialize inflateInit2() with error: %d", r); + freez(state->stream); + state->initialized = false; + return; + } + + simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK); + } +} + +void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state) { + if (state->stream) { + inflateEnd(state->stream); + freez(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + z_stream *strm = state->stream; + strm->avail_in = (uInt)compressed_size; + strm->next_in = (Bytef *)compressed_data; + strm->avail_out = (uInt)state->output.size; + strm->next_out = (Bytef *)state->output.data; + + int ret = inflate(strm, Z_SYNC_FLUSH); + if (ret != Z_STREAM_END && ret != Z_OK) { + netdata_log_error("RRDPUSH DECOMPRESS: inflate() failed with error %d", ret); + return 0; + } + + if(strm->avail_in != 0) { + netdata_log_error("RRDPUSH DECOMPRESS: inflate() did not use all compressed data we provided " + "(compressed payload %zu bytes, remaining to be uncompressed %u)" + , compressed_size, strm->avail_in); + return 0; + } + + if(strm->avail_out == 0) { + netdata_log_error("RRDPUSH DECOMPRESS: inflate() needs a bigger output buffer than the one we provided " + "(compressed payload %zu bytes, output buffer size %zu bytes)" + , compressed_size, state->output.size); + return 0; + } + + size_t decompressed_size = state->output.size - strm->avail_out; + + state->output.read_pos = 0; + state->output.write_pos = decompressed_size; + + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} diff --git a/streaming/compression_gzip.h b/streaming/compression_gzip.h new file mode 100644 index 000000000..85f34bc6d --- /dev/null +++ b/streaming/compression_gzip.h @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_GZIP_H +#define NETDATA_STREAMING_COMPRESSION_GZIP_H + +void rrdpush_compressor_init_gzip(struct compressor_state *state); +void rrdpush_compressor_destroy_gzip(struct compressor_state *state); +size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_gzip(struct decompressor_state *state); +void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state); + +#endif //NETDATA_STREAMING_COMPRESSION_GZIP_H diff --git a/streaming/compression_lz4.c b/streaming/compression_lz4.c new file mode 100644 index 000000000..f5174134e --- /dev/null +++ b/streaming/compression_lz4.c @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_lz4.h" + +#ifdef ENABLE_LZ4 +#include "lz4.h" + +// ---------------------------------------------------------------------------- +// compress + +void rrdpush_compressor_init_lz4(struct compressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = LZ4_createStream(); + + // LZ4 needs access to the last 64KB of source data + // so, we keep twice the size of each message + simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2); + } +} + +void rrdpush_compressor_destroy_lz4(struct compressor_state *state) { + if (state->stream) { + LZ4_freeStream(state->stream); + state->stream = NULL; + } +} + +/* + * Compress the given block of data + * Compressed data will remain in the internal buffer until the next invocation + * Return the size of compressed data block as result and the pointer to internal buffer using the last argument + * or 0 in case of error + */ +size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) { + if(unlikely(!state || !size || !out)) + return 0; + + // we need to keep the last 64K of our previous source data + // as they were in the ring buffer + + simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size)); + + if(state->input.write_pos + size > state->input.size) + // the input buffer cannot fit out data, restart from zero + simple_ring_buffer_reset(&state->input); + + simple_ring_buffer_append_data(&state->input, data, size); + + long int compressed_data_size = LZ4_compress_fast_continue( + state->stream, + state->input.data + state->input.read_pos, + (char *)state->output.data, + (int)(state->input.write_pos - state->input.read_pos), + (int)state->output.size, + state->level); + + if (compressed_data_size <= 0) { + netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld " + "(source is %zu bytes, output buffer can fit %zu bytes)", + compressed_data_size, size, state->output.size); + return 0; + } + + state->input.read_pos = state->input.write_pos; + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += compressed_data_size; + + *out = state->output.data; + return compressed_data_size; +} + +// ---------------------------------------------------------------------------- +// decompress + +void rrdpush_decompressor_init_lz4(struct decompressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = LZ4_createStreamDecode(); + simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2); + } +} + +void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) { + if (state->stream) { + LZ4_freeStreamDecode(state->stream); + state->stream = NULL; + } +} + +/* + * Decompress the compressed data in the internal buffer + * Return the size of uncompressed data or 0 for error + */ +size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size)) + // the input buffer cannot fit out data, restart from zero + simple_ring_buffer_reset(&state->output); + + long int decompressed_size = LZ4_decompress_safe_continue( + state->stream + , compressed_data + , (char *)(state->output.data + state->output.write_pos) + , (int)compressed_size + , (int)(state->output.size - state->output.write_pos) + ); + + if (unlikely(decompressed_size < 0)) { + netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld " + "(compressed chunk is %zu bytes)" + , decompressed_size, compressed_size); + return 0; + } + + if(unlikely(decompressed_size + state->output.write_pos > state->output.size)) + fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer " + "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)" + , state->output.size + , state->output.write_pos + , decompressed_size + , (size_t)(state->output.write_pos + decompressed_size - state->output.size) + ); + + state->output.write_pos += decompressed_size; + + // statistics + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} + +#endif // ENABLE_LZ4 diff --git a/streaming/compression_lz4.h b/streaming/compression_lz4.h new file mode 100644 index 000000000..69f0fadcc --- /dev/null +++ b/streaming/compression_lz4.h @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_LZ4_H +#define NETDATA_STREAMING_COMPRESSION_LZ4_H + +#ifdef ENABLE_LZ4 + +void rrdpush_compressor_init_lz4(struct compressor_state *state); +void rrdpush_compressor_destroy_lz4(struct compressor_state *state); +size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_lz4(struct decompressor_state *state); +void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state); + +#endif // ENABLE_LZ4 + +#endif //NETDATA_STREAMING_COMPRESSION_LZ4_H diff --git a/streaming/compression_zstd.c b/streaming/compression_zstd.c new file mode 100644 index 000000000..dabc044f7 --- /dev/null +++ b/streaming/compression_zstd.c @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_zstd.h" + +#ifdef ENABLE_ZSTD +#include <zstd.h> + +void rrdpush_compressor_init_zstd(struct compressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = ZSTD_createCStream(); + + if(state->level < 1) + state->level = 1; + + if(state->level > ZSTD_maxCLevel()) + state->level = ZSTD_maxCLevel(); + + size_t ret = ZSTD_initCStream(state->stream, state->level); + if(ZSTD_isError(ret)) + netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret)); + + // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1); + // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast); + } +} + +void rrdpush_compressor_destroy_zstd(struct compressor_state *state) { + if(state->stream) { + ZSTD_freeCStream(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) { + if(unlikely(!state || !size || !out)) + return 0; + + ZSTD_inBuffer inBuffer = { + .pos = 0, + .size = size, + .src = data, + }; + + size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize()); + simple_ring_buffer_make_room(&state->output, wanted_size); + + ZSTD_outBuffer outBuffer = { + .pos = 0, + .size = state->output.size, + .dst = (void *)state->output.data, + }; + + // compress + size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer); + + // error handling + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(inBuffer.pos < inBuffer.size) { + netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)", + inBuffer.size, inBuffer.pos); + return 0; + } + + if(outBuffer.pos == 0) { + // ZSTD needs more input to flush the output, so let's flush it manually + ret = ZSTD_flushStream(state->stream, &outBuffer); + + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(outBuffer.pos == 0) { + netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes " + "(source is %zu bytes, output buffer can fit %zu bytes) " + , size, outBuffer.size); + return 0; + } + } + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += outBuffer.pos; + + // return values + *out = state->output.data; + return outBuffer.pos; +} + +void rrdpush_decompressor_init_zstd(struct decompressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = ZSTD_createDStream(); + + size_t ret = ZSTD_initDStream(state->stream); + if(ZSTD_isError(ret)) + netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret)); + + simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize())); + } +} + +void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) { + if (state->stream) { + ZSTD_freeDStream(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + ZSTD_inBuffer inBuffer = { + .pos = 0, + .size = compressed_size, + .src = compressed_data, + }; + + ZSTD_outBuffer outBuffer = { + .pos = 0, + .dst = (char *)state->output.data, + .size = state->output.size, + }; + + size_t ret = ZSTD_decompressStream( + state->stream + , &outBuffer + , &inBuffer); + + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(inBuffer.pos < inBuffer.size) + fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, " + "but %zu bytes of compressed data remain", + inBuffer.pos, inBuffer.size); + + size_t decompressed_size = outBuffer.pos; + + state->output.read_pos = 0; + state->output.write_pos = outBuffer.pos; + + // statistics + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} + +#endif // ENABLE_ZSTD diff --git a/streaming/compression_zstd.h b/streaming/compression_zstd.h new file mode 100644 index 000000000..bfabbf89d --- /dev/null +++ b/streaming/compression_zstd.h @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_ZSTD_H +#define NETDATA_STREAMING_COMPRESSION_ZSTD_H + +#ifdef ENABLE_ZSTD + +void rrdpush_compressor_init_zstd(struct compressor_state *state); +void rrdpush_compressor_destroy_zstd(struct compressor_state *state); +size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_zstd(struct decompressor_state *state); +void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state); + +#endif // ENABLE_ZSTD + +#endif //NETDATA_STREAMING_COMPRESSION_ZSTD_H diff --git a/streaming/receiver.c b/streaming/receiver.c index 10ef8b7d3..a12b94fb4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "web/server/h2o/http_server.h" extern struct config stream_config; @@ -28,9 +29,7 @@ void receiver_state_free(struct receiver_state *rpt) { close(rpt->fd); } -#ifdef ENABLE_RRDPUSH_COMPRESSION rrdpush_decompressor_destroy(&rpt->decompressor); -#endif if(rpt->system_info) rrdhost_system_info_free(rpt->system_info); @@ -59,6 +58,11 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz return 0; } +#ifdef ENABLE_H2O + if (is_h2o_rrdpush(r)) + return (int)h2o_stream_read(r->h2o_ctx, buffer, size); +#endif + int tries = 100; ssize_t bytes_read; @@ -92,15 +96,44 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz return (int)bytes_read; } -static inline bool receiver_read_uncompressed(struct receiver_state *r) { +static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) { + if(code > 0) + return 0; + + switch(code) { + case 0: + // asked to read zero bytes + return STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER; + + case -1: + // EOF + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF; + + case -2: + // failed to read + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED; + + case -3: + // timeout + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT; + + default: + // anything else + return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + } +} + +static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) { #ifdef NETDATA_INTERNAL_CHECKS if(r->reader.read_buffer[r->reader.read_len] != '\0') fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); - if(unlikely(bytes_read <= 0)) + if(unlikely(bytes_read <= 0)) { + *reason = read_stream_error_to_reason(bytes_read); return false; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); @@ -111,8 +144,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) { return true; } -#ifdef ENABLE_RRDPUSH_COMPRESSION -static inline bool receiver_read_compressed(struct receiver_state *r) { +static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) { internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0', "%s: read_buffer does not start with zero #2", __FUNCTION__ ); @@ -150,8 +182,10 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { int bytes_read = 0; do { int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read); - if (unlikely(ret <= 0)) + if (unlikely(ret <= 0)) { + *reason = read_stream_error_to_reason(ret); return false; + } bytes_read += ret; } while(unlikely(bytes_read < (int)r->decompressor.signature_size)); @@ -187,7 +221,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { int last_read_bytes = read_stream(r, &compressed[start], remaining); if (unlikely(last_read_bytes <= 0)) { - internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes); + *reason = read_stream_error_to_reason(last_read_bytes); return false; } @@ -217,57 +251,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { return true; } -#else // !ENABLE_RRDPUSH_COMPRESSION -static inline bool receiver_read_compressed(struct receiver_state *r) { - return receiver_read_uncompressed(r); -} -#endif // ENABLE_RRDPUSH_COMPRESSION - -/* Produce a full line if one exists, statefully return where we start next time. - * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. - */ -inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) { - buffer_need_bytes(dst, reader->read_len - reader->pos + 2); - - size_t start = reader->pos; - - char *ss = &reader->read_buffer[start]; - char *se = &reader->read_buffer[reader->read_len]; - char *ds = &dst->buffer[dst->len]; - char *de = &ds[dst->size - dst->len - 2]; - - if(ss >= se) { - *ds = '\0'; - reader->pos = 0; - reader->read_len = 0; - reader->read_buffer[reader->read_len] = '\0'; - return false; - } - - // copy all bytes to buffer - while(ss < se && ds < de && *ss != '\n') { - *ds++ = *ss++; - dst->len++; - } - - // if we have a newline, return the buffer - if(ss < se && ds < de && *ss == '\n') { - // newline found in the r->read_buffer - - *ds++ = *ss++; // copy the newline too - dst->len++; - - *ds = '\0'; - - reader->pos = ss - reader->read_buffer; - return true; - } - - reader->pos = 0; - reader->read_len = 0; - reader->read_buffer[reader->read_len] = '\0'; - return false; -} bool plugin_is_enabled(struct plugind *cd); @@ -315,6 +298,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); } +#ifdef ENABLE_H2O + parser->h2o_ctx = rpt->h2o_ctx; +#endif + pluginsd_keywords_init(parser, PARSER_INIT_STREAMING); rrd_collector_started(); @@ -323,43 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - bool compressed_connection = false; + { + bool compressed_connection = rrdpush_decompression_initialize(rpt); -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - compressed_connection = true; - rrdpush_decompressor_reset(&rpt->decompressor); - } - else - rrdpush_decompressor_destroy(&rpt->decompressor); + buffered_reader_init(&rpt->reader); + +#ifdef NETDATA_LOG_STREAM_RECEIVE + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( + rpt->host) : "unknown" + ); + parser->user.stream_log_fp = fopen(filename, "w"); + parser->user.stream_log_repertoire = PARSER_REP_METADATA; + } #endif - buffered_reader_init(&rpt->reader); + CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - while(!receiver_should_stop(rpt)) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - if(!buffered_reader_next_line(&rpt->reader, buffer)) { - bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); + while(!receiver_should_stop(rpt)) { - if(unlikely(!have_new_data)) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false); - break; - } + if(!buffered_reader_next_line(&rpt->reader, buffer)) { + STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; - continue; - } + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) + : receiver_read_uncompressed(rpt, &reason); - if (unlikely(parser_action(parser, buffer->buffer))) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); - break; - } + if(unlikely(!have_new_data)) { + receiver_set_exit_reason(rpt, reason, false); + break; + } - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - buffer_free(buffer); - result = parser->user.data_collections_count; + continue; + } + + if(unlikely(parser_action(parser, buffer->buffer))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); + break; + } + + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + result = parser->user.data_collections_count; + } // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -400,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { if (rpt->config.alarms_delay > 0) { host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; - netdata_log_health( - "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", - rrdhost_hostname(host), - (int64_t) rpt->config.alarms_delay); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(host), + (int64_t) rpt->config.alarms_delay); } } @@ -521,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r 5); } -void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { - - log_stream_connection(rpt->client_ip, rpt->client_port, - (rpt->key && *rpt->key)? rpt->key : "-", - (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-", - (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", - status); - - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " - "%s. " - "STATUS: %s%s%s%s" - , rpt->hostname - , rpt->client_ip, rpt->client_port - , msg - , status - , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" - , stream_handshake_error_to_string(rpt->exit.reason) - , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" - ); - +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) { + // this function may be called BEFORE we spawn the receiver thread + // so, we need to add the fields again (it does not harm) + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""), + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status), + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'" + , (rpt->key && *rpt->key)? rpt->key : "" + , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "" + , msg); + + nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s" + , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "" + , msg + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" + , stream_handshake_error_to_string(rpt->exit.reason) + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" + ); } static void rrdpush_receive(struct receiver_state *rpt) @@ -611,11 +619,19 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); -#ifdef ENABLE_RRDPUSH_COMPRESSION rpt->config.rrdpush_compression = default_rrdpush_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); -#endif // ENABLE_RRDPUSH_COMPRESSION + + bool is_ephemeral = false; + is_ephemeral = appconfig_get_boolean(&stream_config, rpt->key, "is ephemeral node", CONFIG_BOOLEAN_NO); + is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral); + + if(rpt->config.rrdpush_compression) { + char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER); + order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order); + rrdpush_parse_compression_order(rpt, order); + } (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); @@ -623,39 +639,46 @@ static void rrdpush_receive(struct receiver_state *rpt) { // this will also update the host with our system_info RRDHOST *host = rrdhost_find_or_create( - rpt->hostname - , rpt->registry_hostname - , rpt->machine_guid - , rpt->os - , rpt->timezone - , rpt->abbrev_timezone - , rpt->utc_offset - , rpt->tags - , rpt->program_name - , rpt->program_version - , rpt->config.update_every - , rpt->config.history - , rpt->config.mode - , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO) - , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key) - , rpt->config.rrdpush_destination - , rpt->config.rrdpush_api_key - , rpt->config.rrdpush_send_charts_matching - , rpt->config.rrdpush_enable_replication - , rpt->config.rrdpush_seconds_to_replicate - , rpt->config.rrdpush_replication_step - , rpt->system_info - , 0 - ); + rpt->hostname, + rpt->registry_hostname, + rpt->machine_guid, + rpt->os, + rpt->timezone, + rpt->abbrev_timezone, + rpt->utc_offset, + rpt->tags, + rpt->program_name, + rpt->program_version, + rpt->config.update_every, + rpt->config.history, + rpt->config.mode, + (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO), + (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && + *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && + *rpt->config.rrdpush_api_key), + rpt->config.rrdpush_destination, + rpt->config.rrdpush_api_key, + rpt->config.rrdpush_send_charts_matching, + rpt->config.rrdpush_enable_replication, + rpt->config.rrdpush_seconds_to_replicate, + rpt->config.rrdpush_replication_step, + rpt->system_info, + 0); if(!host) { - rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt,"failed to find/create host structure, rejecting connection", + RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR); goto cleanup; } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { - rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER"); + rrdpush_receive_log_status( + rpt, "host is initializing, retry later", + RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); goto cleanup; } @@ -664,7 +687,10 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->system_info = NULL; if(!rrdhost_set_receiver(host, rpt)) { - rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt, "host is already served by another receiver", + RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING); goto cleanup; } @@ -709,12 +735,7 @@ static void rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); -#ifdef ENABLE_RRDPUSH_COMPRESSION - if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - if (!rpt->config.rrdpush_compression) - rpt->capabilities &= ~STREAM_CAP_COMPRESSION; - } -#endif // ENABLE_RRDPUSH_COMPRESSION + rrdpush_select_receiver_compression_algorithm(rpt); { // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); @@ -737,19 +758,32 @@ static void rrdpush_receive(struct receiver_state *rpt) } netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - ssize_t bytes_sent = send_timeout( +#ifdef ENABLE_H2O + if (is_h2o_rrdpush(rpt)) { + h2o_stream_write(rpt->h2o_ctx, initial_response, strlen(initial_response)); + } else { +#endif + ssize_t bytes_sent = send_timeout( #ifdef ENABLE_HTTPS - &rpt->ssl, + &rpt->ssl, #endif - rpt->fd, initial_response, strlen(initial_response), 0, 60); - - if(bytes_sent != (ssize_t)strlen(initial_response)) { - internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); - rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); - goto cleanup; + rpt->fd, initial_response, strlen(initial_response), 0, 60); + + if(bytes_sent != (ssize_t)strlen(initial_response)) { + internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); + rrdpush_receive_log_status( + rpt, "cannot reply back, dropping connection", + RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR); + goto cleanup; + } +#ifdef ENABLE_H2O } +#endif } +#ifdef ENABLE_H2O + unless_h2o_rrdpush(rpt) +#endif { // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) @@ -770,17 +804,22 @@ static void rrdpush_receive(struct receiver_state *rpt) , rpt->fd); } - rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED"); + rrdpush_receive_log_status( + rpt, "connected and ready to receive data", + RRDPUSH_STATUS_CONNECTED, NDLP_INFO); #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected if (netdata_cloud_enabled) - aclk_host_state_update(rpt->host, 1); + aclk_host_state_update(rpt->host, 1, 1); #endif rrdhost_set_is_parent_label(); + if (is_ephemeral) + rrdhost_option_set(rpt->host, RRDHOST_OPTION_EPHEMERAL_HOST); + // let it reconnect to parent immediately rrdpush_reset_destinations_postpone_time(rpt->host); @@ -796,15 +835,17 @@ static void rrdpush_receive(struct receiver_state *rpt) { char msg[100 + 1]; - snprintfz(msg, 100, "disconnected (completed %zu updates)", count); - rrdpush_receive_log_status(rpt, msg, "DISCONNECTED"); + snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count); + rrdpush_receive_log_status( + rpt, msg, + RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING); } #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // a child disconnected if (netdata_cloud_enabled) - aclk_host_state_update(rpt->host, 0); + aclk_host_state_update(rpt->host, 0, 1); #endif cleanup: @@ -828,19 +869,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { rrdhost_set_is_parent_label(); } +static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) { + struct receiver_state *rpt = ptr; + if(!rpt) + return false; + + stream_capabilities_to_string(wb, rpt->capabilities); + return true; +} + +static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) { + struct receiver_state *rpt = ptr; + if(!rpt) + return false; + +#ifdef ENABLE_HTTPS + buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http"); +#else + buffer_strcat(wb, "http"); +#endif + return true; +} + void *rrdpush_receiver_thread(void *ptr) { netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); - worker_register("STREAMRCV"); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); + { + worker_register("STREAMRCV"); + + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, + "received bytes", "bytes/s", + WORKER_METRIC_INCREMENT); + + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, + "uncompressed bytes", "bytes/s", + WORKER_METRIC_INCREMENT); - struct receiver_state *rpt = (struct receiver_state *)ptr; - rpt->tid = gettid(); - netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + "replication completion", "%", + WORKER_METRIC_ABSOLUTE); - rrdpush_receive(rpt); + struct receiver_state *rpt = (struct receiver_state *) ptr; + rpt->tid = gettid(); + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname), + ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip + , rpt->client_port); + + rrdpush_receive(rpt); + } netdata_thread_cleanup_pop(1); return NULL; diff --git a/streaming/replication.c b/streaming/replication.c index ffb6b3def..bc34361b3 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare( size_t count = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd))) + if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd))) continue; if (unlikely(rd_dfe.counter >= q->dimensions)) { @@ -213,31 +213,38 @@ static struct replication_query *replication_query_prepare( } static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) { - NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; RRDDIM *rd; rrddim_foreach_read(rd, st){ - if (!rrddim_check_exposed(rd)) continue; + if (!rrddim_check_upstream_exposed(rd)) continue; - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", - sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); - buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + - (usec_t) rd->collector.last_collected_time.tv_usec); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + + (usec_t) rd->collector.last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); - buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value); + buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value); + buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value); buffer_fast_strcat(wb, "\n", 1); } rrddim_foreach_done(rd); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1); - buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); buffer_fast_strcat(wb, "\n", 1); } @@ -313,7 +320,8 @@ static void replication_query_align_to_optimal_before(struct replication_query * static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); - NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; time_t after = q->query.after; time_t before = q->query.before; size_t dimensions = q->dimensions; @@ -344,8 +352,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s if(max_skip <= 0) { d->skip = true; - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query " "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), @@ -394,14 +402,15 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s fix_min_start_time = min_end_time - min_update_every; #ifdef NETDATA_INTERNAL_CHECKS - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' " - "misaligned dimensions, " - "update every (min: %ld, max: %ld), " - "start time (min: %ld, max: %ld), " - "end time (min %ld, max %ld), " - "now %ld, last end time sent %ld, " - "min start time is fixed to %ld", + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "REPLAY WARNING: 'host:%s/chart:%s' " + "misaligned dimensions, " + "update every (min: %ld, max: %ld), " + "start time (min: %ld, max: %ld), " + "end time (min %ld, max %ld), " + "now %ld, last end time sent %ld, " + "min start time is fixed to %ld", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), min_update_every, max_update_every, min_start_time, max_start_time, @@ -444,12 +453,19 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s } last_end_time_in_buffer = min_end_time; - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4); - buffer_print_uint64_encoded(wb, encoding, min_start_time); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '' ", 4); + buffer_print_uint64_encoded(wb, integer_encoding, min_start_time); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, min_end_time); + buffer_print_uint64_encoded(wb, integer_encoding, min_end_time); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); // output the replay values for this time @@ -462,10 +478,17 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s !storage_point_is_unset(d->sp) && !storage_point_is_gap(d->sp))) { - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " \"", 2); buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id)); buffer_fast_strcat(wb, "\" ", 2); - buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum); + buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum); buffer_fast_strcat(wb, " ", 1); buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED); buffer_fast_strcat(wb, "\n", 1); @@ -595,7 +618,8 @@ void replication_response_cancel_and_finalize(struct replication_query *q) { static bool sender_is_still_connected_for_this_request(struct replication_request *rq); bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { - NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; struct replication_request *rq = q->rq; RRDSET *st = q->st; RRDHOST *host = st->rrdhost; @@ -605,12 +629,17 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size // holding the host's buffer lock for too long BUFFER *wb = sender_start(host->sender); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "'\n", 2); -// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; @@ -634,19 +663,19 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size // last end time of the data we sent buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1); - buffer_print_int64_encoded(wb, encoding, st->update_every); + buffer_print_int64_encoded(wb, integer_encoding, st->update_every); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, db_first_entry); + buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, db_last_entry); + buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry); buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7); - buffer_print_uint64_encoded(wb, encoding, after); + buffer_print_uint64_encoded(wb, integer_encoding, after); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, before); + buffer_print_uint64_encoded(wb, integer_encoding, before); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); @@ -664,7 +693,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size rrdhost_sender_replicating_charts_minus_one(st->rrdhost); if(!finished_with_gap) - st->upstream_resync_time_s = 0; + st->rrdpush.sender.resync_time_s = 0; #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", @@ -729,8 +758,8 @@ static void replicate_log_request(struct replication_request_details *r, const c #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "REPLAY ERROR: 'host:%s/chart:%s' child sent: " "db from %ld to %ld%s, wall clock time %ld, " @@ -793,7 +822,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c #endif // NETDATA_LOG_REPLICATION_REQUESTS char buffer[2048 + 1]; - snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", rrdset_id(st), r->wanted.start_streaming ? "true" : "false", (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index a42bc13a0..7c1df2cad 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -39,9 +39,9 @@ struct config stream_config = { }; unsigned int default_rrdpush_enabled = 0; -#ifdef ENABLE_RRDPUSH_COMPRESSION +STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE; + unsigned int default_rrdpush_compression_enabled = 1; -#endif char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; char *default_rrdpush_send_charts_matching = NULL; @@ -57,53 +57,16 @@ static void load_stream_conf() { errno = 0; char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) { - netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); + nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load user config '%s'. Will try stock config.", filename); freez(filename); filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) - netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); + nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); } freez(filename); } -STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { - - // we can have DATA_WITH_ML when INTERPOLATED is available - bool ml_capability = true; - - if(host && sender) { - // we have DATA_WITH_ML capability - // we should remove the DATA_WITH_ML capability if our database does not have anomaly info - // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML - netdata_mutex_lock(&host->receiver_lock); - - if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) - ml_capability = false; - - netdata_mutex_unlock(&host->receiver_lock); - } - - return STREAM_CAP_V1 | - STREAM_CAP_V2 | - STREAM_CAP_VN | - STREAM_CAP_VCAPS | - STREAM_CAP_HLABELS | - STREAM_CAP_CLAIM | - STREAM_CAP_CLABELS | - STREAM_CAP_FUNCTIONS | - STREAM_CAP_REPLICATION | - STREAM_CAP_BINARY | - STREAM_CAP_INTERPOLATED | - STREAM_HAS_COMPRESSION | -#ifdef NETDATA_TEST_DYNCFG - STREAM_CAP_DYNCFG | -#endif - (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | - (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | - 0; -} - bool rrdpush_receiver_needs_dbengine() { struct section *co; @@ -145,13 +108,27 @@ int rrdpush_init() { rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); -#ifdef ENABLE_RRDPUSH_COMPRESSION default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enable compression", default_rrdpush_compression_enabled); -#endif + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "brotli compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "zstd compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "lz4 compression acceleration", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "gzip compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]); if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { - netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing."); + nd_log_daemon(NDLP_WARNING, "STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; } @@ -159,7 +136,7 @@ int rrdpush_init() { netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate); if(!netdata_ssl_validate_certificate_sender) - netdata_log_info("SSL: streaming senders will skip SSL certificates verification."); + nd_log_daemon(NDLP_NOTICE, "SSL: streaming senders will skip SSL certificates verification."); netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); @@ -245,11 +222,13 @@ static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { // Send the current chart definition. // Assumes that collector thread has already called sender_start for mutex / buffer state. static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { - bool replication_progress = false; + uint32_t version = rrdset_metadata_version(st); RRDHOST *host = st->rrdhost; + NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false; - rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + bool replication_progress = false; // properly set the name for the remote end to parse it char *name = ""; @@ -264,10 +243,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { } } + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); + } + // send the chart buffer_sprintf( wb - , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n" + , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , rrdset_id(st) , name , rrdset_title(st) @@ -292,19 +278,25 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the dimensions RRDDIM *rd; rrddim_foreach_read(rd, st) { + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + buffer_sprintf( - wb - , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" - , rrddim_id(rd) - , rrddim_name(rd) - , rrd_algorithm_name(rd->algorithm) - , rd->multiplier - , rd->divisor - , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" - , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" - , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" + wb + , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" + , rrddim_id(rd) + , rrddim_name(rd) + , rrd_algorithm_name(rd->algorithm) + , rd->multiplier + , rd->divisor + , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" + , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" + , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" ); - rrddim_set_exposed(rd); } rrddim_foreach_done(rd); @@ -339,7 +331,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { #endif } - st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + // we can set the exposed flag, after we commit the buffer + // because replication may pick it up prematurely + rrddim_foreach_read(rd, st) { + rrddim_metadata_exposed_upstream(rd, version); + } + rrddim_foreach_done(rd); + rrdset_metadata_exposed_upstream(st, version); + + st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); return replication_progress; } @@ -349,7 +351,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); - if(st->last_collected_time.tv_sec > st->upstream_resync_time_s) + if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s) buffer_print_uint64(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); @@ -361,7 +363,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta if(unlikely(!rrddim_check_updated(rd))) continue; - if(likely(rrddim_check_exposed(rd))) { + if(likely(rrddim_check_upstream_exposed_collector(rd))) { buffer_fast_strcat(wb, "SET \"", 5); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "\" = ", 4); @@ -372,7 +374,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); // we will include it in the next iteration - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrddim_metadata_updated(rd); } } rrddim_foreach_done(rd); @@ -390,12 +392,12 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; if(unlikely(!rrdhost_can_send_definitions_to_parent(host) - || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST)))) + || !should_send_chart_matching(st, rrdset_flag_get(st)))) { return false; + } BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); return true; @@ -410,6 +412,7 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags)) return; + bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; BUFFER *wb = rsb->wb; @@ -419,7 +422,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ if(unlikely(rsb->begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every); @@ -436,7 +446,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ rsb->begin_v2_added = true; } - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); @@ -485,11 +502,14 @@ void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, cons BUFFER *wb = sender_start(host->sender); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state); - if (job->reason) + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state); + + if (job->reason && strlen(job->reason)) buffer_sprintf(wb, " \"%s\"", job->reason); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + buffer_strcat(wb, "\n"); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); @@ -503,7 +523,7 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); } @@ -522,24 +542,24 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); - netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); } return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; } else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { - netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); + nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { BUFFER *wb = sender_start(host->sender); rrd_functions_expose_global_rrdpush(host, wb); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); } - RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); - bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED); + bool exposed_upstream = rrdset_check_upstream_exposed(st); + RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st); bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED); if(unlikely((exposed_upstream && replication_in_progress) || @@ -549,7 +569,6 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!exposed_upstream)) { BUFFER *wb = sender_start(host->sender); replication_in_progress = rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); } if(replication_in_progress) @@ -597,7 +616,7 @@ void rrdpush_send_global_functions(RRDHOST *host) { rrd_functions_expose_global_rrdpush(host, wb); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); } @@ -630,7 +649,7 @@ void rrdpush_send_dyncfg(RRDHOST *host) { } dfe_done(plug); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); } @@ -656,7 +675,7 @@ void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, cons buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); } @@ -669,6 +688,19 @@ void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const c buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); @@ -709,11 +741,9 @@ int connect_to_one_of_destinations( if(d->postpone_reconnection_until > now) continue; - internal_error(true, + nd_log(NDLS_DAEMON, NDLP_DEBUG, "STREAM %s: connecting to '%s' (default port: %d)...", - rrdhost_hostname(host), - string2str(d->destination), - default_port); + rrdhost_hostname(host), string2str(d->destination), default_port); if (reconnects_counter) *reconnects_counter += 1; @@ -766,7 +796,7 @@ bool destinations_init_add_one(char *entry, void *data) { DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); t->count++; - netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); + nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); return false; // we return false, so that we will get all defined destinations } @@ -835,11 +865,6 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai // ---------------------------------------------------------------------------- // rrdpush receiver thread -void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) { - netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); -} - - static void rrdpush_sender_thread_spawn(RRDHOST *host) { sender_lock(host->sender); @@ -848,7 +873,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host)); if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender)) - netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); + nd_log_daemon(NDLP_ERR, "STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); } @@ -898,16 +923,21 @@ static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struc } void *rrdpush_receiver_thread(void *ptr); -int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) { +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx) { if(!service_running(ABILITY_STREAMING_CONNECTIONS)) return rrdpush_receiver_too_busy_now(w); struct receiver_state *rpt = callocz(1, sizeof(*rpt)); rpt->last_msg_t = now_monotonic_sec(); - rpt->capabilities = STREAM_CAP_INVALID; rpt->hops = 1; + rpt->capabilities = STREAM_CAP_INVALID; + +#ifdef ENABLE_H2O + rpt->h2o_ctx = h2o_ctx; +#endif + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); @@ -1003,7 +1033,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false); if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " + nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: " "request has parameter '%s' = '%s', which is not used." , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" , rpt->client_ip, rpt->client_port @@ -1032,9 +1062,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!rpt->key || !*rpt->key) { rrdpush_receive_log_status( - rpt, - "request without an API key", - "NO API KEY PERMISSION DENIED"); + rpt, "request without an API key, rejecting connection", + RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1042,9 +1071,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!rpt->hostname || !*rpt->hostname) { rrdpush_receive_log_status( - rpt, - "request without a hostname", - "NO HOSTNAME PERMISSION DENIED"); + rpt, "request without a hostname, rejecting connection", + RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1055,9 +1083,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!rpt->machine_guid || !*rpt->machine_guid) { rrdpush_receive_log_status( - rpt, - "request without a machine GUID", - "NO MACHINE GUID PERMISSION DENIED"); + rpt, "request without a machine GUID, rejecting connection", + RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1068,9 +1095,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (regenerate_guid(rpt->key, buf) == -1) { rrdpush_receive_log_status( - rpt, - "API key is not a valid UUID (use the command uuidgen to generate one)", - "INVALID API KEY PERMISSION DENIED"); + rpt, "API key is not a valid UUID (use the command uuidgen to generate one)", + RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1078,9 +1104,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (regenerate_guid(rpt->machine_guid, buf) == -1) { rrdpush_receive_log_status( - rpt, - "machine GUID is not a valid UUID", - "INVALID MACHINE GUID PERMISSION DENIED"); + rpt, "machine GUID is not a valid UUID", + RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1091,9 +1116,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!api_key_type || !*api_key_type) api_key_type = "unknown"; if(strcmp(api_key_type, "api") != 0) { rrdpush_receive_log_status( - rpt, - "API key is a machine GUID", - "INVALID API KEY PERMISSION DENIED"); + rpt, "API key is a machine GUID", + RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1101,9 +1125,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) { rrdpush_receive_log_status( - rpt, - "API key is not enabled", - "API KEY DISABLED PERMISSION DENIED"); + rpt, "API key is not enabled", + RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1119,9 +1142,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri simple_pattern_free(key_allow_from); rrdpush_receive_log_status( - rpt, - "API key is not allowed from this IP", - "NOT ALLOWED IP PERMISSION DENIED"); + rpt, "API key is not allowed from this IP", + RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1137,9 +1159,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (strcmp(machine_guid_type, "machine") != 0) { rrdpush_receive_log_status( - rpt, - "machine GUID is an API key", - "INVALID MACHINE GUID PERMISSION DENIED"); + rpt, "machine GUID is an API key", + RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1148,9 +1169,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) { rrdpush_receive_log_status( - rpt, - "machine GUID is not enabled", - "MACHINE GUID DISABLED PERMISSION DENIED"); + rpt, "machine GUID is not enabled", + RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1166,9 +1186,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri simple_pattern_free(machine_allow_from); rrdpush_receive_log_status( - rpt, - "machine GUID is not allowed from this IP", - "NOT ALLOWED IP PERMISSION DENIED"); + rpt, "machine GUID is not allowed from this IP", + RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1183,9 +1202,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rrdpush_receiver_takeover_web_connection(w, rpt); rrdpush_receive_log_status( - rpt, - "machine GUID is my own", - "LOCALHOST PERMISSION DENIED"); + rpt, "machine GUID is my own", + RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG); char initial_response[HTTP_HEADER_SIZE + 1]; snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); @@ -1196,11 +1214,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri #endif rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " - "failed to reply." - , rpt->hostname - , rpt->client_ip, rpt->client_port - ); + nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: " + "failed to reply." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); } receiver_state_free(rpt); @@ -1221,14 +1239,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri spinlock_unlock(&spinlock); char msg[100 + 1]; - snprintfz(msg, 100, + snprintfz(msg, sizeof(msg) - 1, "rate limit, will accept new connection in %ld secs", (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); rrdpush_receive_log_status( - rpt, - msg, - "RATE LIMIT TRY LATER"); + rpt, msg, + RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE); receiver_state_free(rpt); return rrdpush_receiver_too_busy_now(w); @@ -1276,29 +1293,26 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri // we can proceed with this connection receiver_stale = false; - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " - "stopped previous stale receiver to accept this one." - , rpt->hostname - , rpt->client_ip, rpt->client_port - ); + nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: " + "stopped previous stale receiver to accept this one." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); } if (receiver_working || receiver_stale) { // another receiver is already connected // try again later -#ifdef NETDATA_INTERNAL_CHECKS char msg[200 + 1]; - snprintfz(msg, 200, + snprintfz(msg, sizeof(msg) - 1, "multiple connections for same host, " - "old connection was used %ld secs ago%s", + "old connection was last used %ld secs ago%s", age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)"); rrdpush_receive_log_status( - rpt, - msg, - "ALREADY CONNECTED"); -#endif + rpt, msg, + RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG); // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up buffer_flush(w->response.data); @@ -1308,8 +1322,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } } - netdata_log_debug(D_SYSTEM, "starting STREAM receive thread."); - rrdpush_receiver_takeover_web_connection(w, rpt); char tag[NETDATA_THREAD_TAG_MAX + 1]; @@ -1318,9 +1330,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) { rrdpush_receive_log_status( - rpt, - "can't create receiver thread", - "INTERNAL SERVER ERROR"); + rpt, "can't create receiver thread", + RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR); buffer_flush(w->response.data); buffer_strcat(w->response.data, "Can't handle this request"); @@ -1364,11 +1375,15 @@ static struct { { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" }, { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" }, { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" }, - { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" }, + {STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR, "DISCONNECTED UNKNOWN SOCKET READ ERROR" }, { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" }, { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" }, { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" }, { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" }, + { STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER, "DISCONNECTED NOT SUFFICIENT READ BUFFER" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF, "DISCONNECTED SOCKET EOF" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, "DISCONNECTED SOCKET READ FAILED" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT, "DISCONNECTED SOCKET READ TIMEOUT" }, { 0, NULL }, }; @@ -1389,25 +1404,29 @@ static struct { STREAM_CAPABILITIES cap; const char *str; } capability_names[] = { - { STREAM_CAP_V1, "V1" }, - { STREAM_CAP_V2, "V2" }, - { STREAM_CAP_VN, "VN" }, - { STREAM_CAP_VCAPS, "VCAPS" }, - { STREAM_CAP_HLABELS, "HLABELS" }, - { STREAM_CAP_CLAIM, "CLAIM" }, - { STREAM_CAP_CLABELS, "CLABELS" }, - { STREAM_CAP_COMPRESSION, "COMPRESSION" }, - { STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, - { STREAM_CAP_REPLICATION, "REPLICATION" }, - { STREAM_CAP_BINARY, "BINARY" }, - { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, - { STREAM_CAP_IEEE754, "IEEE754" }, - { STREAM_CAP_DATA_WITH_ML, "ML" }, - { STREAM_CAP_DYNCFG, "DYN_CFG" }, - { 0 , NULL }, + {STREAM_CAP_V1, "V1" }, + {STREAM_CAP_V2, "V2" }, + {STREAM_CAP_VN, "VN" }, + {STREAM_CAP_VCAPS, "VCAPS" }, + {STREAM_CAP_HLABELS, "HLABELS" }, + {STREAM_CAP_CLAIM, "CLAIM" }, + {STREAM_CAP_CLABELS, "CLABELS" }, + {STREAM_CAP_LZ4, "LZ4" }, + {STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, + {STREAM_CAP_REPLICATION, "REPLICATION" }, + {STREAM_CAP_BINARY, "BINARY" }, + {STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, + {STREAM_CAP_IEEE754, "IEEE754" }, + {STREAM_CAP_DATA_WITH_ML, "ML" }, + {STREAM_CAP_DYNCFG, "DYNCFG" }, + {STREAM_CAP_SLOTS, "SLOTS" }, + {STREAM_CAP_ZSTD, "ZSTD" }, + {STREAM_CAP_GZIP, "GZIP" }, + {STREAM_CAP_BROTLI, "BROTLI" }, + {0 , NULL }, }; -static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { +void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { for(size_t i = 0; capability_names[i].str ; i++) { if(caps & capability_names[i].cap) { buffer_strcat(wb, capability_names[i].str); @@ -1434,8 +1453,8 @@ void log_receiver_capabilities(struct receiver_state *rpt) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); - netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", - rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); + nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); buffer_free(wb); } @@ -1444,12 +1463,51 @@ void log_sender_capabilities(struct sender_state *s) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); - netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", - rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); + nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s", + rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); buffer_free(wb); } +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { + STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities; + + if(host && sender) { + // we have DATA_WITH_ML capability + // we should remove the DATA_WITH_ML capability if our database does not have anomaly info + // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML + netdata_mutex_lock(&host->receiver_lock); + + if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) + disabled_capabilities |= STREAM_CAP_DATA_WITH_ML; + + netdata_mutex_unlock(&host->receiver_lock); + + if(host->sender) + disabled_capabilities |= host->sender->disabled_capabilities; + } + + return (STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_CAP_SLOTS | + STREAM_CAP_COMPRESSIONS_AVAILABLE | + #ifdef NETDATA_TEST_DYNCFG + STREAM_CAP_DYNCFG | + #endif + STREAM_CAP_IEEE754 | + STREAM_CAP_DATA_WITH_ML | + 0) & ~disabled_capabilities; +} + STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) { STREAM_CAPABILITIES caps = 0; @@ -1457,7 +1515,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS; else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM; else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS; - else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION; + else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE; else caps = version; if(caps & STREAM_CAP_VCAPS) @@ -1479,8 +1537,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH } int32_t stream_capabilities_to_vn(uint32_t caps) { - if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION; + if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4; if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS; return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM) } - diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index c3c14233f..1459c881e 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -18,12 +18,14 @@ #define STREAM_OLD_VERSION_CLAIM 3 #define STREAM_OLD_VERSION_CLABELS 4 -#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production +#define STREAM_OLD_VERSION_LZ4 5 // ---------------------------------------------------------------------------- // capabilities negotiation typedef enum { + STREAM_CAP_NONE = 0, + // do not use the first 3 bits // they used to be versions 1, 2 and 3 // before we introduce capabilities @@ -38,7 +40,7 @@ typedef enum { STREAM_CAP_HLABELS = (1 << 7), // host labels supported STREAM_CAP_CLAIM = (1 << 8), // claiming supported STREAM_CAP_CLABELS = (1 << 9), // chart labels supported - STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported + STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported STREAM_CAP_REPLICATION = (1 << 12), // replication supported STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data @@ -46,22 +48,47 @@ typedef enum { STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming + STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart + STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported + STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported + STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit // needed for negotiating errors between parent and child } STREAM_CAPABILITIES; -#ifdef ENABLE_RRDPUSH_COMPRESSION -#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION +#ifdef ENABLE_LZ4 +#define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4 +#else +#define STREAM_CAP_LZ4_AVAILABLE 0 +#endif // ENABLE_LZ4 + +#ifdef ENABLE_ZSTD +#define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD #else -#define STREAM_HAS_COMPRESSION 0 -#endif // ENABLE_RRDPUSH_COMPRESSION +#define STREAM_CAP_ZSTD_AVAILABLE 0 +#endif // ENABLE_ZSTD + +#ifdef ENABLE_BROTLI +#define STREAM_CAP_BROTLI_AVAILABLE STREAM_CAP_BROTLI +#else +#define STREAM_CAP_BROTLI_AVAILABLE 0 +#endif // ENABLE_BROTLI + +#define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_BROTLI_AVAILABLE|STREAM_CAP_GZIP) + +extern STREAM_CAPABILITIES globally_disabled_capabilities; STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender); #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability)) +static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) { + STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask); + return (common & (common - 1)) != 0 && common != 0; +} + // ---------------------------------------------------------------------------- // stream handshake @@ -79,6 +106,31 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender); #define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later." #define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later." +#define RRDPUSH_STATUS_CONNECTED "CONNECTED" +#define RRDPUSH_STATUS_ALREADY_CONNECTED "ALREADY CONNECTED" +#define RRDPUSH_STATUS_DISCONNECTED "DISCONNECTED" +#define RRDPUSH_STATUS_RATE_LIMIT "RATE LIMIT TRY LATER" +#define RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS "INITIALIZATION IN PROGRESS RETRY LATER" +#define RRDPUSH_STATUS_INTERNAL_SERVER_ERROR "INTERNAL SERVER ERROR DROPPING CONNECTION" +#define RRDPUSH_STATUS_DUPLICATE_RECEIVER "DUPLICATE RECEIVER DROPPING CONNECTION" +#define RRDPUSH_STATUS_CANT_REPLY "CANT REPLY DROPPING CONNECTION" +#define RRDPUSH_STATUS_NO_HOSTNAME "NO HOSTNAME PERMISSION DENIED" +#define RRDPUSH_STATUS_NO_API_KEY "NO API KEY PERMISSION DENIED" +#define RRDPUSH_STATUS_INVALID_API_KEY "INVALID API KEY PERMISSION DENIED" +#define RRDPUSH_STATUS_NO_MACHINE_GUID "NO MACHINE GUID PERMISSION DENIED" +#define RRDPUSH_STATUS_MACHINE_GUID_DISABLED "MACHINE GUID DISABLED PERMISSION DENIED" +#define RRDPUSH_STATUS_INVALID_MACHINE_GUID "INVALID MACHINE GUID PERMISSION DENIED" +#define RRDPUSH_STATUS_API_KEY_DISABLED "API KEY DISABLED PERMISSION DENIED" +#define RRDPUSH_STATUS_NOT_ALLOWED_IP "NOT ALLOWED IP PERMISSION DENIED" +#define RRDPUSH_STATUS_LOCALHOST "LOCALHOST PERMISSION DENIED" +#define RRDPUSH_STATUS_PERMISSION_DENIED "PERMISSION DENIED" +#define RRDPUSH_STATUS_BAD_HANDSHAKE "BAD HANDSHAKE" +#define RRDPUSH_STATUS_TIMEOUT "TIMEOUT" +#define RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION "CANT UPGRADE CONNECTION" +#define RRDPUSH_STATUS_SSL_ERROR "SSL ERROR" +#define RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE "INVALID SSL CERTIFICATE" +#define RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION "CANT ESTABLISH SSL CONNECTION" + typedef enum { STREAM_HANDSHAKE_OK_V3 = 3, // v3+ STREAM_HANDSHAKE_OK_V2 = 2, // v2 @@ -101,11 +153,16 @@ typedef enum { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17, - STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18, + STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20, STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21, STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22, + STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26, + STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE = -27, } STREAM_HANDSHAKE; @@ -120,100 +177,7 @@ typedef struct { char *kernel_version; } stream_encoded_t; -#ifdef ENABLE_RRDPUSH_COMPRESSION -// signature MUST end with a newline -#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) -#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) -#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4 - -struct compressor_state { - bool initialized; - char *compression_result_buffer; - size_t compression_result_buffer_size; - struct { - void *lz4_stream; - char *input_ring_buffer; - size_t input_ring_buffer_size; - size_t input_ring_buffer_pos; - } stream; - size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); - void (*destroy)(struct compressor_state **state); -}; - -void rrdpush_compressor_reset(struct compressor_state *state); -void rrdpush_compressor_destroy(struct compressor_state *state); -size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out); - -struct decompressor_state { - bool initialized; - size_t signature_size; - size_t total_compressed; - size_t total_uncompressed; - size_t packet_count; - struct { - void *lz4_stream; - char *buffer; - size_t size; - size_t write_at; - size_t read_at; - } stream; -}; - -void rrdpush_decompressor_destroy(struct decompressor_state *state); -void rrdpush_decompressor_reset(struct decompressor_state *state); -size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); - -static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) { - if (unlikely(!data || !data_size)) - return 0; - - if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE)) - return 0; - - uint32_t sign = *(uint32_t *)data; - if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE)) - return 0; - - size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); - return length; -} - -static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) { - if(unlikely(state->stream.read_at != state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); - - return rrdpush_decompress_decode_header(header, header_size); -} - -static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) { - if(unlikely(state->stream.read_at > state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); - - return state->stream.write_at - state->stream.read_at; -} - -static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { - if (unlikely(!state || !size || !dst)) - return 0; - - size_t remaining = rrdpush_decompressed_bytes_in_buffer(state); - - if(unlikely(!remaining)) - return 0; - - size_t bytes_to_return = size; - if(bytes_to_return > remaining) - bytes_to_return = remaining; - - memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return); - state->stream.read_at += bytes_to_return; - - if(unlikely(state->stream.read_at > state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); - - return bytes_to_return; -} -#endif +#include "compression.h" // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. @@ -223,6 +187,7 @@ typedef enum __attribute__((packed)) { STREAM_TRAFFIC_TYPE_FUNCTIONS, STREAM_TRAFFIC_TYPE_METADATA, STREAM_TRAFFIC_TYPE_DATA, + STREAM_TRAFFIC_TYPE_DYNCFG, // terminator STREAM_TRAFFIC_TYPE_MAX, @@ -230,7 +195,6 @@ typedef enum __attribute__((packed)) { typedef enum __attribute__((packed)) { SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct function_payload_state { @@ -263,6 +227,7 @@ struct sender_state { char read_buffer[PLUGINSD_LINE_MAX + 1]; ssize_t read_len; STREAM_CAPABILITIES capabilities; + STREAM_CAPABILITIES disabled_capabilities; size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; @@ -274,9 +239,12 @@ struct sender_state { uint16_t hops; -#ifdef ENABLE_RRDPUSH_COMPRESSION + struct line_splitter line; struct compressor_state compressor; -#endif // ENABLE_RRDPUSH_COMPRESSION + +#ifdef NETDATA_LOG_STREAM_SENDER + FILE *stream_log_fp; +#endif #ifdef ENABLE_HTTPS NETDATA_SSL ssl; // structure used to encrypt the connection @@ -306,6 +274,8 @@ struct sender_state { usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; + + int parent_using_h2o; }; #define sender_lock(sender) spinlock_lock(&(sender)->spinlock) @@ -362,19 +332,6 @@ typedef struct stream_node_instance { } STREAM_NODE_INSTANCE; */ -struct buffered_reader { - ssize_t read_len; - ssize_t pos; - char read_buffer[PLUGINSD_LINE_MAX + 1]; -}; - -bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst); -static inline void buffered_reader_init(struct buffered_reader *reader) { - reader->read_buffer[0] = '\0'; - reader->read_len = 0; - reader->pos = 0; -} - struct receiver_state { RRDHOST *host; pid_t tid; @@ -421,6 +378,7 @@ struct receiver_state { time_t rrdpush_replication_step; char *rrdpush_destination; // DONT FREE - it is allocated in appconfig unsigned int rrdpush_compression; + STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX]; } config; #ifdef ENABLE_HTTPS @@ -429,17 +387,24 @@ struct receiver_state { time_t replication_first_time_t; -#ifdef ENABLE_RRDPUSH_COMPRESSION struct decompressor_state decompressor; -#endif // ENABLE_RRDPUSH_COMPRESSION /* struct { uint32_t count; STREAM_NODE_INSTANCE *array; } instances; */ + +#ifdef ENABLE_H2O + void *h2o_ctx; +#endif }; +#ifdef ENABLE_H2O +#define is_h2o_rrdpush(x) ((x)->h2o_ctx != NULL) +#define unless_h2o_rrdpush(x) if(!is_h2o_rrdpush(x)) +#endif + struct rrdpush_destinations { STRING *destination; bool ssl; @@ -453,9 +418,7 @@ struct rrdpush_destinations { }; extern unsigned int default_rrdpush_enabled; -#ifdef ENABLE_RRDPUSH_COMPRESSION extern unsigned int default_rrdpush_compression_enabled; -#endif // ENABLE_RRDPUSH_COMPRESSION extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; @@ -498,11 +461,10 @@ void rrdpush_send_dyncfg(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended -int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string); +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx); void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait); void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); -void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); int connect_to_one_of_destinations( RRDHOST *host, int default_port, @@ -514,18 +476,15 @@ int connect_to_one_of_destinations( void rrdpush_signal_sender_to_wake_up(struct sender_state *s); -#ifdef ENABLE_RRDPUSH_COMPRESSION -struct compressor_state *create_compressor(); -#endif // ENABLE_RRDPUSH_COMPRESSION - void rrdpush_reset_destinations_postpone_time(RRDHOST *host); const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error); void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key); -void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender); int32_t stream_capabilities_to_vn(uint32_t caps); +void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps); void receiver_state_free(struct receiver_state *rpt); bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason); @@ -781,6 +740,13 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags); +void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name); + +bool rrdpush_compression_initialize(struct sender_state *s); +bool rrdpush_decompression_initialize(struct receiver_state *rpt); +void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order); +void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt); +void rrdpush_compression_deactivate(struct sender_state *s); #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 71f875034..09b67e968 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1,31 +1,37 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" - -#define WORKER_SENDER_JOB_CONNECT 0 -#define WORKER_SENDER_JOB_PIPE_READ 1 -#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2 -#define WORKER_SENDER_JOB_EXECUTE 3 -#define WORKER_SENDER_JOB_SOCKET_SEND 4 -#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5 -#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 -#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 -#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 -#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 -#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 -#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 -#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 -#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 -#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 -#define WORKER_SENDER_JOB_BUFFER_RATIO 15 -#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 -#define WORKER_SENDER_JOB_BYTES_SENT 17 -#define WORKER_SENDER_JOB_REPLAY_REQUEST 18 -#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19 -#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20 - -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21 +#include "common.h" +#include "aclk/https_client.h" + +#define WORKER_SENDER_JOB_CONNECT 0 +#define WORKER_SENDER_JOB_PIPE_READ 1 +#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2 +#define WORKER_SENDER_JOB_EXECUTE 3 +#define WORKER_SENDER_JOB_SOCKET_SEND 4 +#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5 +#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 +#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 +#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 +#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 +#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 +#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 +#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 +#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 +#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 +#define WORKER_SENDER_JOB_BUFFER_RATIO 15 +#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 +#define WORKER_SENDER_JOB_BYTES_SENT 17 +#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18 +#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19 +#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20 +#define WORKER_SENDER_JOB_REPLAY_REQUEST 21 +#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23 +#define WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION 24 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 25 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 25 #endif extern struct config stream_config; @@ -66,21 +72,6 @@ BUFFER *sender_start(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); -#ifdef ENABLE_RRDPUSH_COMPRESSION -/* -* In case of stream compression buffer overflow -* Inform the user through the error log file and -* deactivate compression by downgrading the stream protocol. -*/ -static inline void deactivate_compression(struct sender_state *s) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); - netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it."); - s->flags &= ~SENDER_FLAG_COMPRESSION; - netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); - rrdpush_sender_thread_close_socket(s->host); -} -#endif - #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 // Collector thread finishing a transmission @@ -102,13 +93,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) sender_lock(s); -// FILE *fp = fopen("/tmp/stream.txt", "a"); -// fprintf(fp, -// "\n--- SEND BEGIN: %s ----\n" -// "%s" -// "--- SEND END ----------------------------------------\n" -// , rrdhost_hostname(s->host), src); -// fclose(fp); +#ifdef NETDATA_LOG_STREAM_SENDER + if(type == STREAM_TRAFFIC_TYPE_METADATA) { + if(!s->stream_log_fp) { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-sender-%s.txt", s->host ? rrdhost_hostname(s->host) : "unknown"); + + s->stream_log_fp = fopen(filename, "w"); + } + + fprintf(s->stream_log_fp, "\n--- SEND MESSAGE START: %s ----\n" + "%s" + "--- SEND MESSAGE END ----------------------------------------\n" + , rrdhost_hostname(s->host), src + ); + } +#endif if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", @@ -117,8 +117,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } -#ifdef ENABLE_RRDPUSH_COMPRESSION - if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) { + if (s->compressor.initialized) { while(src_len) { size_t size_to_compress = src_len; @@ -143,28 +142,45 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) } } - char *dst; + const char *dst; size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if (!dst_len) { netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", rrdhost_hostname(s->host), s->connected_to); - rrdpush_compressor_reset(&s->compressor); + rrdpush_compression_initialize(s); dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if(!dst_len) { netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", rrdhost_hostname(s->host), s->connected_to); - deactivate_compression(s); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); + rrdpush_compression_deactivate(s); + rrdpush_sender_thread_close_socket(s->host); sender_unlock(s); return; } } - if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) + rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len); + +#ifdef NETDATA_INTERNAL_CHECKS + // check if reversing the signature provides the same length + size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature)); + if(decoded_dst_len != dst_len) + fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, " + "compressed payload length %zu bytes, but signature says payload is %zu bytes", + size_to_compress, dst_len, decoded_dst_len); +#endif + + if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature))) s->flags |= SENDER_FLAG_OVERFLOW; - else - s->sent_bytes_on_this_connection_per_type[type] += dst_len; + else { + if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) + s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature); + } src = src + size_to_compress; src_len -= size_to_compress; @@ -174,12 +190,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) s->flags |= SENDER_FLAG_OVERFLOW; else s->sent_bytes_on_this_connection_per_type[type] += src_len; -#else - if(cbuffer_add_unsafe(s->buffer, src, src_len)) - s->flags |= SENDER_FLAG_OVERFLOW; - else - s->sent_bytes_on_this_connection_per_type[type] += src_len; -#endif replication_recalculate_buffer_used_ratio_unsafe(s); @@ -191,7 +201,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) sender_unlock(s); - if(signal_sender) + if(signal_sender && (!stream_has_capability(s, STREAM_CAP_INTERPOLATED) || type != STREAM_TRAFFIC_TYPE_DATA)) rrdpush_signal_sender_to_wake_up(s); } @@ -251,15 +261,17 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDSET *st; rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - st->upstream_resync_time_s = 0; + st->rrdpush.sender.resync_time_s = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) - rrddim_clear_exposed(rd); + rrddim_metadata_exposed_upstream_clear(rd); rrddim_foreach_done(rd); + + rrdset_metadata_updated(st); } rrdset_foreach_done(st); @@ -342,8 +354,7 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { rrdpush_sender_charts_and_replication_reset(host); } -void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) -{ +void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) { se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz(""); se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz(""); se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz(""); @@ -351,128 +362,155 @@ void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz(""); } -void rrdpush_clean_encoded(stream_encoded_t *se) -{ - if (se->os_name) +void rrdpush_clean_encoded(stream_encoded_t *se) { + if (se->os_name) { freez(se->os_name); + se->os_name = NULL; + } - if (se->os_id) + if (se->os_id) { freez(se->os_id); + se->os_id = NULL; + } - if (se->os_version) + if (se->os_version) { freez(se->os_version); + se->os_version = NULL; + } - if (se->kernel_name) + if (se->kernel_name) { freez(se->kernel_name); + se->kernel_name = NULL; + } - if (se->kernel_version) + if (se->kernel_version) { freez(se->kernel_version); + se->kernel_version = NULL; + } } struct { const char *response; + const char *status; size_t length; int32_t version; bool dynamic; const char *error; int worker_job_id; int postpone_reconnect_seconds; - bool prevent_log; + ND_LOG_FIELD_PRIORITY priority; } stream_responses[] = { { .response = START_STREAMING_PROMPT_VN, .length = sizeof(START_STREAMING_PROMPT_VN) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V3, // and above .dynamic = true, // dynamic = we will parse the version / capabilities .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_PROMPT_V2, .length = sizeof(START_STREAMING_PROMPT_V2) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V2, .dynamic = false, .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_PROMPT_V1, .length = sizeof(START_STREAMING_PROMPT_V1) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V1, .dynamic = false, .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_ERROR_SAME_LOCALHOST, .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1, + .status = RRDPUSH_STATUS_LOCALHOST, .version = STREAM_HANDSHAKE_ERROR_LOCALHOST, .dynamic = false, .error = "remote server rejected this stream, the host we are trying to stream is its localhost", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour - .prevent_log = true, + .priority = NDLP_DEBUG, }, { .response = START_STREAMING_ERROR_ALREADY_STREAMING, .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1, + .status = RRDPUSH_STATUS_ALREADY_CONNECTED, .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, .dynamic = false, .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes - .prevent_log = true, + .priority = NDLP_DEBUG, }, { .response = START_STREAMING_ERROR_NOT_PERMITTED, .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1, + .status = RRDPUSH_STATUS_PERMISSION_DENIED, .version = STREAM_HANDSHAKE_ERROR_DENIED, .dynamic = false, .error = "remote server denied access, probably we don't have the right API key?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute + .priority = NDLP_ERR, }, { .response = START_STREAMING_ERROR_BUSY_TRY_LATER, .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1, + .status = RRDPUSH_STATUS_RATE_LIMIT, .version = STREAM_HANDSHAKE_BUSY_TRY_LATER, .dynamic = false, .error = "remote server is currently busy, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes + .priority = NDLP_NOTICE, }, { .response = START_STREAMING_ERROR_INTERNAL_ERROR, .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1, + .status = RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, .version = STREAM_HANDSHAKE_INTERNAL_ERROR, .dynamic = false, .error = "remote server is encountered an internal error, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 5 * 60, // 5 minutes + .priority = NDLP_CRIT, }, { .response = START_STREAMING_ERROR_INITIALIZATION, .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1, + .status = RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, .version = STREAM_HANDSHAKE_INITIALIZATION, .dynamic = false, .error = "remote server is initializing, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minute + .priority = NDLP_NOTICE, }, // terminator { .response = NULL, .length = 0, + .status = RRDPUSH_STATUS_BAD_HANDSHAKE, .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, .dynamic = false, .error = "remote node response is not understood, is it Netdata?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute - .prevent_log = false, + .priority = NDLP_ERR, } }; @@ -502,8 +540,9 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } - bool prevent_log = stream_responses[i].prevent_log; + ND_LOG_FIELD_PRIORITY priority = stream_responses[i].priority; const char *error = stream_responses[i].error; + const char *status = stream_responses[i].status; int worker_job_id = stream_responses[i].worker_job_id; int delay = stream_responses[i].postpone_reconnect_seconds; @@ -512,19 +551,29 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender host->destination->reason = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; - char buf[LOG_DATE_LENGTH]; - log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - if(prevent_log) - internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); - else - netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); + char buf[RFC3339_MAX_LENGTH]; + rfc3339_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0, false); + + nd_log(NDLS_DAEMON, priority, + "STREAM %s [send to %s]: %s - will retry in %d secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; } +unsigned char alpn_proto_list[] = { + 18, 'n', 'e', 't', 'd', 'a', 't', 'a', '_', 's', 't', 'r', 'e', 'a', 'm', '/', '2', '.', '0', + 8, 'h', 't', 't', 'p', '/', '1', '.', '1' +}; + +#define CONN_UPGRADE_VAL "upgrade" + static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { #ifdef ENABLE_HTTPS RRDHOST *host = s->host; @@ -535,10 +584,16 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { if(!ssl_required) return true; - if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) { + if (netdata_ssl_open_ext(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket, alpn_proto_list, sizeof(alpn_proto_list))) { if(!netdata_ssl_connect(&host->sender->ssl)) { // couldn't connect + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_SSL_ERROR), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR; @@ -550,6 +605,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { security_test_certificate(host->sender->ssl.conn)) { // certificate is not valid + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); @@ -561,6 +622,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { return true; } + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + netdata_log_error("SSL: failed to establish connection."); return false; @@ -570,6 +637,104 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { #endif } +static int rrdpush_http_upgrade_prelude(RRDHOST *host, struct sender_state *s) { + + char http[HTTP_HEADER_SIZE + 1]; + snprintfz(http, HTTP_HEADER_SIZE, + "GET " NETDATA_STREAM_URL HTTP_1_1 HTTP_ENDL + "Upgrade: " NETDATA_STREAM_PROTO_NAME HTTP_ENDL + "Connection: Upgrade" + HTTP_HDR_END); + + ssize_t bytes = send_timeout( +#ifdef ENABLE_HTTPS + &host->sender->ssl, +#endif + s->rrdpush_sender_socket, + http, + strlen(http), + 0, + 1000); + + bytes = recv_timeout( +#ifdef ENABLE_HTTPS + &host->sender->ssl, +#endif + s->rrdpush_sender_socket, + http, + HTTP_HEADER_SIZE, + 0, + 1000); + + if (bytes <= 0) { + error_report("Error reading from remote"); + return 1; + } + + rbuf_t buf = rbuf_create(bytes); + rbuf_push(buf, http, bytes); + + http_parse_ctx ctx; + http_parse_ctx_create(&ctx); + ctx.flags |= HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT; + + int rc; +// while((rc = parse_http_response(buf, &ctx)) == HTTP_PARSE_NEED_MORE_DATA); + rc = parse_http_response(buf, &ctx); + + if (rc != HTTP_PARSE_SUCCESS) { + error_report("Failed to parse HTTP response sent. (%d)", rc); + goto err_cleanup; + } + if (ctx.http_code == HTTP_RESP_MOVED_PERM) { + const char *hdr = get_http_header_by_name(&ctx, "location"); + if (hdr) + error_report("HTTP response is %d Moved Permanently (location: \"%s\") instead of expected %d Switching Protocols.", ctx.http_code, hdr, HTTP_RESP_SWITCH_PROTO); + else + error_report("HTTP response is %d instead of expected %d Switching Protocols.", ctx.http_code, HTTP_RESP_SWITCH_PROTO); + goto err_cleanup; + } + if (ctx.http_code == HTTP_RESP_NOT_FOUND) { + error_report("HTTP response is %d instead of expected %d Switching Protocols. Parent version too old.", ctx.http_code, HTTP_RESP_SWITCH_PROTO); + // TODO set some flag here that will signify parent is older version + // and to try connection without rrdpush_http_upgrade_prelude next time + goto err_cleanup; + } + if (ctx.http_code != HTTP_RESP_SWITCH_PROTO) { + error_report("HTTP response is %d instead of expected %d Switching Protocols", ctx.http_code, HTTP_RESP_SWITCH_PROTO); + goto err_cleanup; + } + + const char *hdr = get_http_header_by_name(&ctx, "connection"); + if (!hdr) { + error_report("Missing \"connection\" header in reply"); + goto err_cleanup; + } + if (strncmp(hdr, CONN_UPGRADE_VAL, strlen(CONN_UPGRADE_VAL))) { + error_report("Expected \"connection: " CONN_UPGRADE_VAL "\""); + goto err_cleanup; + } + + hdr = get_http_header_by_name(&ctx, "upgrade"); + if (!hdr) { + error_report("Missing \"upgrade\" header in reply"); + goto err_cleanup; + } + if (strncmp(hdr, NETDATA_STREAM_PROTO_NAME, strlen(NETDATA_STREAM_PROTO_NAME))) { + error_report("Expected \"upgrade: " NETDATA_STREAM_PROTO_NAME "\""); + goto err_cleanup; + } + + netdata_log_debug(D_STREAM, "Stream sender upgrade to \"" NETDATA_STREAM_PROTO_NAME "\" successful"); + rbuf_free(buf); + http_parse_ctx_destroy(&ctx); + return 0; +err_cleanup: + rbuf_free(buf); + http_parse_ctx_destroy(&ctx); + return 1; +} + static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) { struct timeval tv = { @@ -600,12 +765,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p // reset our capabilities to default s->capabilities = stream_our_capabilities(host, true); -#ifdef ENABLE_RRDPUSH_COMPRESSION - // If we don't want compression, remove it from our capabilities - if(!(s->flags & SENDER_FLAG_COMPRESSION)) - s->capabilities &= ~STREAM_CAP_COMPRESSION; -#endif // ENABLE_RRDPUSH_COMPRESSION - /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. */ @@ -660,7 +819,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p "&NETDATA_SYSTEM_TOTAL_RAM=%s" "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s" "&NETDATA_PROTOCOL_VERSION=%s" - " HTTP/1.1\r\n" + HTTP_1_1 HTTP_ENDL "User-Agent: %s/%s\r\n" "Accept: */*\r\n\r\n" , host->rrdpush_send_api_key @@ -715,6 +874,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(!rrdpush_sender_connect_ssl(s)) return false; + if (s->parent_using_h2o && rrdpush_http_upgrade_prelude(host, s)) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION); + rrdpush_sender_thread_close_socket(host); + host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE; + host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; + return false; + } + ssize_t bytes, len = (ssize_t)strlen(http); bytes = send_timeout( @@ -728,9 +901,19 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p timeout); if(bytes <= 0) { // timeout is 0 + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", + rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; return false; @@ -747,41 +930,62 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p timeout); if(bytes <= 0) { // timeout is 0 + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s]: remote netdata does not respond.", + rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 30; return false; } if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s]: cannot set non-blocking mode for socket.", + rrdhost_hostname(host), s->connected_to); if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s]: cannot enlarge the socket buffer.", + rrdhost_hostname(host), s->connected_to); http[bytes] = '\0'; - netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) return false; -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) - rrdpush_compressor_reset(&s->compressor); - else - rrdpush_compressor_destroy(&s->compressor); -#endif // ENABLE_RRDPUSH_COMPRESSION + rrdpush_compression_initialize(s); log_sender_capabilities(s); - netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CONNECTED), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s: connected to %s...", + rrdhost_hostname(host), s->connected_to); return true; } -static bool attempt_to_connect(struct sender_state *state) -{ +static bool attempt_to_connect(struct sender_state *state) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + state->send_attempts = 0; // reset the bytes we have sent for this session @@ -950,6 +1154,12 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { void execute_commands(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_EXECUTE); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &s->line), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline; *end = 0; while( start < end && (newline = strchr(start, '\n')) ) { @@ -963,27 +1173,22 @@ void execute_commands(struct sender_state *s) { continue; } - netdata_log_access("STREAM: %d from '%s' for host '%s': %s", - gettid(), s->connected_to, rrdhost_hostname(s->host), start); - - // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); - - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS); - - const char *keyword = get_word(words, num_words, 0); + s->line.count++; + s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS); + const char *command = get_word(s->line.words, s->line.num_words, 0); - if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { + if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1); - char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2); - char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3); + char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2); + char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -1021,9 +1226,12 @@ void execute_commands(struct sender_state *s) { memset(&s->function_payload, 0, sizeof(struct function_payload_state)); } } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); + if (s->receiving_function_payload) { - netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword); + netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", + rrdhost_hostname(s->host), s->connected_to, command); s->receiving_function_payload = false; buffer_free(s->function_payload.payload); s->function_payload.payload = NULL; @@ -1031,14 +1239,14 @@ void execute_commands(struct sender_state *s) { // TODO send error response } - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -1047,30 +1255,32 @@ void execute_commands(struct sender_state *s) { s->receiving_function_payload = true; s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); - s->function_payload.txid = strdupz(get_word(words, num_words, 1)); - s->function_payload.timeout = strdupz(get_word(words, num_words, 2)); - s->function_payload.fn_name = strdupz(get_word(words, num_words, 3)); + s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1)); + s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2)); + s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3)); } - else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); - char *transaction = get_word(words, num_words, 1); + char *transaction = get_word(s->line.words, s->line.num_words, 1); if(transaction && *transaction) rrd_function_cancel(transaction); } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { + else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); - const char *chart_id = get_word(words, num_words, 1); - const char *start_streaming = get_word(words, num_words, 2); - const char *after = get_word(words, num_words, 3); - const char *before = get_word(words, num_words, 4); + const char *chart_id = get_word(s->line.words, s->line.num_words, 1); + const char *start_streaming = get_word(s->line.words, s->line.num_words, 2); + const char *after = get_word(s->line.words, s->line.num_words, 3); + const char *before = get_word(s->line.words, s->line.num_words, 4); if (!chart_id || !start_streaming || !after || !before) { netdata_log_error("STREAM %s [send to %s] %s command is incomplete" " (chart=%s, start_streaming=%s, after=%s, before=%s)", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, chart_id ? chart_id : "(unset)", start_streaming ? start_streaming : "(unset)", after ? after : "(unset)", @@ -1085,12 +1295,14 @@ void execute_commands(struct sender_state *s) { } } else { - netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); + netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->line.words[0]?s->line.words[0]:"(unset)"); } + line_splitter_reset(&s->line); worker_is_busy(WORKER_SENDER_JOB_EXECUTE); start = newline + 1; } + if (start < end) { memmove(s->read_buffer, start, end-start); s->read_len = end - start; @@ -1245,6 +1457,14 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); rrdhost_clear_sender___while_having_sender_mutex(host); + +#ifdef NETDATA_LOG_STREAM_SENDER + if(host->sender->stream_log_fp) { + fclose(host->sender->stream_log_fp); + host->sender->stream_log_fp = NULL; + } +#endif + sender_unlock(host->sender); freez(s->pipe_buffer); @@ -1277,7 +1497,61 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) { #endif } +static bool stream_sender_log_capabilities(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state) + return false; + + stream_capabilities_to_string(wb, state->capabilities); + return true; +} + +static bool stream_sender_log_transport(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state) + return false; + +#ifdef ENABLE_HTTPS + buffer_strcat(wb, SSL_connection(&state->ssl) ? "https" : "http"); +#else + buffer_strcat(wb, "http"); +#endif + return true; +} + +static bool stream_sender_log_dst_ip(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state || state->rrdpush_sender_socket == -1) + return false; + + SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket); + buffer_strcat(wb, peers.peer.ip); + return true; +} + +static bool stream_sender_log_dst_port(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state || state->rrdpush_sender_socket == -1) + return false; + + SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket); + buffer_print_uint64(wb, peers.peer.port); + return true; +} + void *rrdpush_sender_thread(void *ptr) { + struct sender_state *s = ptr; + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname), + ND_LOG_FIELD_CB(NDF_DST_IP, stream_sender_log_dst_ip, s), + ND_LOG_FIELD_CB(NDF_DST_PORT, stream_sender_log_dst_port, s), + ND_LOG_FIELD_CB(NDF_DST_TRANSPORT, stream_sender_log_transport, s), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_sender_log_capabilities, s), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_register("STREAMSND"); worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read"); @@ -1296,6 +1570,7 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION, "disconnect cant upgrade"); worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request"); worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function"); @@ -1303,10 +1578,11 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); - struct sender_state *s = ptr; - if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { @@ -1342,6 +1618,9 @@ void *rrdpush_sender_thread(void *ptr) { "initial clock resync iterations", remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING + s->parent_using_h2o = appconfig_get_boolean( + &stream_config, CONFIG_SECTION_STREAM, "parent using h2o", false); + // initialize rrdpush globals rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); @@ -1397,7 +1676,10 @@ void *rrdpush_sender_thread(void *ptr) { s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s [send to %s]: enabling metrics streaming...", + rrdhost_hostname(s->host), s->connected_to); continue; } @@ -1423,6 +1705,15 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); } + + if(s->compressor.initialized) { + size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed; + size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t); + NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio); + } sender_unlock(s); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); @@ -1459,7 +1750,7 @@ void *rrdpush_sender_thread(void *ptr) { } }; - int poll_rc = poll(fds, 2, 1000); + int poll_rc = poll(fds, 2, 50); // timeout in milliseconds netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); diff --git a/streaming/stream.conf b/streaming/stream.conf index 94e94cab7..36213af02 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -170,6 +170,9 @@ # You can control stream compression in this parent agent stream with options: yes | no #enable compression = yes + # select the order the compression algorithms will be used, when multiple are offered by the child + #compression algorithms order = zstd lz4 brotli gzip + # Replication # Enable replication for all hosts using this api key. Default: enabled #enable replication = yes @@ -180,6 +183,11 @@ # The duration we want to replicate per each step. #replication_step = 600 + # Indicate whether this child is an ephemeral node. An ephemeral node will become unavailable + # after the specified duration of "cleanup ephemeral hosts after secs" (as defined in the db section of netdata.conf) + # from the time of the node's last connection. + #is ephemeral node = false + # ----------------------------------------------------------------------------- # 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA # THIS IS OPTIONAL - YOU DON'T HAVE TO CONFIGURE IT @@ -250,3 +258,8 @@ # The duration we want to replicate per each step. #replication_step = 600 + + # Indicate whether this child is an ephemeral node. An ephemeral node will become unavailable + # after the specified duration of "cleanup ephemeral hosts after secs" (as defined in the db section of netdata.conf) + # from the time of the node's last connection. + #is ephemeral node = false |